Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,7 @@ public String getFormatString()
factory = new GroupByQueryRunnerFactory(
strategySelector,
new GroupByQueryQueryToolChest(
configSupplier,
strategySelector,
bufferPool,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,7 @@ public String getFormatString()
factory = new GroupByQueryRunnerFactory(
strategySelector,
new GroupByQueryQueryToolChest(
configSupplier,
strategySelector,
bufferPool,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
)
);
Expand Down
243 changes: 220 additions & 23 deletions common/src/main/java/io/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,55 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;

import io.druid.java.util.common.logger.Logger;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.ISE;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.ArrayDeque;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
*/
public class BlockingPool<T>
{
private static final Logger log = new Logger(BlockingPool.class);
private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;

private final BlockingQueue<T> objects;
private final ArrayDeque<T> objects;
private final ReentrantLock lock;
private final Condition notEnough;
private final int maxSize;

public BlockingPool(
Supplier<T> generator,
int limit
)
{
this.objects = limit > 0 ? new ArrayBlockingQueue<T>(limit) : null;
this.objects = new ArrayDeque<>(limit);
this.maxSize = limit;

for (int i = 0; i < limit; i++) {
objects.add(generator.get());
}

this.lock = new ReentrantLock();
this.notEnough = lock.newCondition();
}

public int maxSize()
{
return maxSize;
}

@VisibleForTesting
public int getPoolSize()
{
return objects.size();
}

/**
Expand All @@ -58,31 +79,207 @@ public BlockingPool(
* @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout.
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<T> take(final long timeout)
{
checkInitialized();
final T theObject;
try {
if (timeout > -1) {
theObject = timeout > 0 ? poll(timeout) : poll();
} else {
theObject = take();
}
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
{
offer(theObject);
}
}
);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private T poll()
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
return objects.isEmpty() ? null : objects.pop();
} finally {
lock.unlock();
}
}

private T poll(long timeout) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
return objects.pop();
} finally {
lock.unlock();
}
}

private T take() throws InterruptedException
{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
notEnough.await();
}
return objects.pop();
} finally {
lock.unlock();
}
}

/**
* Take a resource from the pool.
*
* @param elementNum number of resources to take
* @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout.
*
* @throws InterruptedException if interrupted while waiting for a resource to become available
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<T> take(final long timeout) throws InterruptedException
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeout)
{
Preconditions.checkState(objects != null, "Pool was initialized with limit = 0, there are no objects to take.");
final T theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take();
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
checkInitialized();
final List<T> objects;
try {
if (timeout > -1) {
objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum);
} else {
objects = takeBatch(elementNum);
}
return objects == null ? null : new ReferenceCountingResourceHolder<>(
objects,
new Closeable()
{
if (!objects.offer(theObject)) {
log.error("WTF?! Queue offer failed, uh oh...");
@Override
public void close() throws IOException
{
offerBatch(objects);
}
}
);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private List<T> pollBatch(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (objects.size() < elementNum) {
return null;
} else {
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
);
return list;
}
} finally {
lock.unlock();
}
}

@VisibleForTesting
protected int getQueueSize()
private List<T> pollBatch(int elementNum, long timeout) throws InterruptedException
{
return objects.size();
long nanos = TIME_UNIT.toNanos(timeout);
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}

private List<T> takeBatch(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
notEnough.await();
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}

private void checkInitialized()
{
Preconditions.checkState(maxSize > 0, "Pool was initialized with limit = 0, there are no objects to take.");
}

private void offer(T theObject)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() < maxSize) {
objects.push(theObject);
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}

private void offerBatch(List<T> offers)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() + offers.size() <= maxSize) {
for (T offer : offers) {
objects.push(offer);
}
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static <T> CombiningSequence<T> create(
private final Ordering<T> ordering;
private final BinaryFn<T, T, T> mergeFn;

public CombiningSequence(
private CombiningSequence(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn
Expand Down
Loading