Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.benchmark.query.QueryBenchmarkUtil;
import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -348,15 +350,15 @@ public void setup() throws IOException
}
}

StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 250_000_000),
0,
Integer.MAX_VALUE
);

// limit of 2 is required since we simulate both historical merge and broker merge in the same process
BlockingPool<ByteBuffer> mergePool = new BlockingPool<>(
BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 250_000_000),
2
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Supplier;

import io.druid.collections.NonBlockingPool;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void teardown()
public static class BenchmarkPool
{
private final AtomicLong numPools = new AtomicLong(0L);
private final StupidPool<Object> pool = new StupidPool<>(
private final NonBlockingPool<Object> pool = new StupidPool<>(
"simpleObject pool",
new Supplier<Object>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -392,15 +394,15 @@ public void setup() throws IOException
}
}

StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 250_000_000),
0,
Integer.MAX_VALUE
);

// limit of 2 is required since we simulate both historical merge and broker merge in the same process
BlockingPool<ByteBuffer> mergePool = new BlockingPool<>(
BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(
new OffheapBufferGenerator("merge", 250_000_000),
2
);
Expand Down
269 changes: 6 additions & 263 deletions common/src/main/java/io/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,59 +19,11 @@

package io.druid.collections;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.ISE;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
*/
public class BlockingPool<T>
public interface BlockingPool<T>
{
private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;

private final ArrayDeque<T> objects;
private final ReentrantLock lock;
private final Condition notEnough;
private final int maxSize;

public BlockingPool(
Supplier<T> generator,
int limit
)
{
this.objects = new ArrayDeque<>(limit);
this.maxSize = limit;

for (int i = 0; i < limit; i++) {
objects.add(generator.get());
}

this.lock = new ReentrantLock();
this.notEnough = lock.newCondition();
}

public int maxSize()
{
return maxSize;
}

@VisibleForTesting
public int getPoolSize()
{
return objects.size();
}
int maxSize();

/**
* Take a resource from the pool, waiting up to the
Expand All @@ -81,91 +33,14 @@ public int getPoolSize()
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
ReferenceCountingResourceHolder<T> take(long timeoutMs);

/**
* Take a resource from the pool, waiting if necessary until an element becomes available.
*
* @return a resource
*/
public ReferenceCountingResourceHolder<T> take()
{
checkInitialized();
try {
return wrapObject(takeObject());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private ReferenceCountingResourceHolder<T> wrapObject(T theObject)
{
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
{
offer(theObject);
}
}
);
}

private T pollObject()
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
return objects.isEmpty() ? null : objects.pop();
} finally {
lock.unlock();
}
}

private T pollObject(long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
return objects.pop();
} finally {
lock.unlock();
}
}

private T takeObject() throws InterruptedException
{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
notEnough.await();
}
return objects.pop();
} finally {
lock.unlock();
}
}
ReferenceCountingResourceHolder<T> take();

/**
* Take resources from the pool, waiting up to the
Expand All @@ -176,17 +51,7 @@ private T takeObject() throws InterruptedException
*
* @return a resource, or null if the timeout was reached
*/
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum, final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum));
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum, long timeoutMs);

/**
* Take resources from the pool, waiting if necessary until the elements of the given number become available.
Expand All @@ -195,127 +60,5 @@ public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum,
*
* @return a resource
*/
public ReferenceCountingResourceHolder<List<T>> takeBatch(final int elementNum)
{
checkInitialized();
try {
return wrapObjects(takeObjects(elementNum));
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private ReferenceCountingResourceHolder<List<T>> wrapObjects(List<T> theObjects)
{
return theObjects == null ? null : new ReferenceCountingResourceHolder<>(
theObjects,
new Closeable()
{
@Override
public void close() throws IOException
{
offerBatch(theObjects);
}
}
);
}

private List<T> pollObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (objects.size() < elementNum) {
return null;
} else {
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
}
} finally {
lock.unlock();
}
}

private List<T> pollObjects(int elementNum, long timeoutMs) throws InterruptedException
{
long nanos = TIME_UNIT.toNanos(timeoutMs);
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
if (nanos <= 0) {
return null;
}
nanos = notEnough.awaitNanos(nanos);
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}

private List<T> takeObjects(int elementNum) throws InterruptedException
{
final List<T> list = Lists.newArrayListWithCapacity(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
notEnough.await();
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
} finally {
lock.unlock();
}
}

private void checkInitialized()
{
Preconditions.checkState(maxSize > 0, "Pool was initialized with limit = 0, there are no objects to take.");
}

private void offer(T theObject)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() < maxSize) {
objects.push(theObject);
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}

private void offerBatch(List<T> offers)
{
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (objects.size() + offers.size() <= maxSize) {
for (T offer : offers) {
objects.push(offer);
}
notEnough.signal();
} else {
throw new ISE("Cannot exceed pre-configured maximum size");
}
} finally {
lock.unlock();
}
}
ReferenceCountingResourceHolder<List<T>> takeBatch(int elementNum);
}
Loading