Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import io.druid.benchmark.query.QueryBenchmarkUtil;
import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.hll.HyperLogLogHash;
Expand Down Expand Up @@ -354,11 +352,9 @@ public void setup() throws IOException
}
}

NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
BlockingPool<ByteBuffer> bufferPool = new DefaultBlockingPool<>(
new OffheapBufferGenerator("compute", 250_000_000),
0,
Integer.MAX_VALUE
2
);

// limit of 2 is required since we simulate both historical merge and broker merge in the same process
Expand Down Expand Up @@ -412,8 +408,7 @@ public String getFormatString()
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
bufferPool
QueryBenchmarkUtil.NOOP_QUERYWATCHER
),
new GroupByStrategyV2(
druidProcessingConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.benchmark.query.QueryBenchmarkUtil;
import io.druid.collections.StupidPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.data.input.InputRow;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
Expand Down Expand Up @@ -300,11 +300,9 @@ public void setup() throws IOException
}

factory = new TopNQueryRunnerFactory(
new StupidPool<>(
"TopNBenchmark-compute-bufferPool",
new DefaultBlockingPool<>(
new OffheapBufferGenerator("compute", 250000000),
0,
Integer.MAX_VALUE
2
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.hll.HyperLogLogHash;
Expand Down Expand Up @@ -448,11 +446,9 @@ public void setup() throws IOException
}
}

NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
BlockingPool<ByteBuffer> bufferPool = new DefaultBlockingPool<>(
new OffheapBufferGenerator("compute", 250_000_000),
0,
Integer.MAX_VALUE
2
);

// limit of 2 is required since we simulate both historical merge and broker merge in the same process
Expand Down Expand Up @@ -506,8 +502,7 @@ public String getFormatString()
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
bufferPool
QueryBenchmarkUtil.NOOP_QUERYWATCHER
),
new GroupByStrategyV2(
druidProcessingConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.collections.StupidPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.data.input.InputRow;
import io.druid.hll.HyperLogLogHash;
import io.druid.jackson.DefaultObjectMapper;
Expand Down Expand Up @@ -273,11 +273,9 @@ public void setup() throws IOException
}

factory = new TopNQueryRunnerFactory(
new StupidPool<>(
"TopNBenchmark-compute-bufferPool",
new DefaultBlockingPool<>(
new OffheapBufferGenerator("compute", 250000000),
0,
Integer.MAX_VALUE
1
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
Expand Down
20 changes: 20 additions & 0 deletions common/src/main/java/io/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.druid.collections;

import io.druid.java.util.common.RE;

import java.util.List;

public interface BlockingPool<T>
Expand All @@ -35,6 +37,24 @@ public interface BlockingPool<T>
*/
ReferenceCountingResourceHolder<T> take(long timeoutMs);

/**
* Take a resource from the pool, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeoutMs maximum time to wait for a resource, in milliseconds.
*
* @return a resource, or throw RuntimeException on timeout.
*/
default ReferenceCountingResourceHolder<T> takeOrFailOnTimeout(long timeoutMs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw a checked Timeout exception of some kind?

{
ReferenceCountingResourceHolder<T> result = take(timeoutMs);
if (result == null) {
throw new RE("Failed to get buffer in [%s] ms.", timeoutMs);
} else {
return result;
}
}

/**
* Take a resource from the pool, waiting if necessary until an element becomes available.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.NonBlockingPool;
import io.druid.data.input.Row;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
Expand All @@ -42,7 +41,6 @@
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand All @@ -60,21 +58,18 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
private final ListeningExecutorService exec;
private final Supplier<GroupByQueryConfig> configSupplier;
private final QueryWatcher queryWatcher;
private final NonBlockingPool<ByteBuffer> bufferPool;

public GroupByMergedQueryRunner(
ExecutorService exec,
Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
NonBlockingPool<ByteBuffer> bufferPool,
Iterable<QueryRunner<T>> queryables
)
{
this.exec = MoreExecutors.listeningDecorator(exec);
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
this.bufferPool = bufferPool;
}

@Override
Expand All @@ -86,7 +81,6 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> r
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
querySpecificConfig,
bufferPool,
true
);
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.BlockingPool;
import io.druid.collections.ResourceHolder;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
Expand Down Expand Up @@ -71,12 +71,12 @@ public class GroupByQueryEngine
private static final int MISSING_VALUE = -1;

private final Supplier<GroupByQueryConfig> config;
private final NonBlockingPool<ByteBuffer> intermediateResultsBufferPool;
private final BlockingPool<ByteBuffer> intermediateResultsBufferPool;

@Inject
public GroupByQueryEngine(
Supplier<GroupByQueryConfig> config,
@Global NonBlockingPool<ByteBuffer> intermediateResultsBufferPool
@Global BlockingPool<ByteBuffer> intermediateResultsBufferPool
)
{
this.config = config;
Expand Down Expand Up @@ -107,7 +107,7 @@ public Sequence<Row> process(final GroupByQuery query, final StorageAdapter stor
null
);

final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.takeOrFailOnTimeout(60000);

return Sequences.concat(
Sequences.withBaggage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.collections.NonBlockingPool;
import io.druid.common.guava.GuavaUtils;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
Expand All @@ -47,7 +46,6 @@
import io.druid.segment.incremental.IndexSizeExceededException;
import org.joda.time.DateTime;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand All @@ -60,7 +58,6 @@ public class GroupByQueryHelper
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query,
final GroupByQueryConfig config,
NonBlockingPool<ByteBuffer> bufferPool,
final boolean combine
)
{
Expand Down Expand Up @@ -118,23 +115,13 @@ public String apply(DimensionSpec input)
.withMinTimestamp(granTimeStart.getMillis())
.build();

if (query.getContextValue("useOffheap", false)) {
index = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setDeserializeComplexMetrics(false)
.setConcurrentEventAdd(true)
.setSortFacts(sortResults)
.setMaxRowCount(querySpecificConfig.getMaxResults())
.buildOffheap(bufferPool);
} else {
index = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setDeserializeComplexMetrics(false)
.setConcurrentEventAdd(true)
.setSortFacts(sortResults)
.setMaxRowCount(querySpecificConfig.getMaxResults())
.buildOnheap();
}
index = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setDeserializeComplexMetrics(false)
.setConcurrentEventAdd(true)
.setSortFacts(sortResults)
.setMaxRowCount(querySpecificConfig.getMaxResults())
.buildOnheap();

Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{
Expand Down Expand Up @@ -189,15 +176,13 @@ public Queue accumulate(Queue accumulated, T in)
public static IncrementalIndex makeIncrementalIndex(
GroupByQuery query,
GroupByQueryConfig config,
NonBlockingPool<ByteBuffer> bufferPool,
Sequence<Row> rows,
boolean combine
)
{
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
config,
bufferPool,
combine
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.collections.ResourceHolder;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.parsers.CloseableIterator;
Expand Down Expand Up @@ -96,7 +95,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
public ConcurrentGrouper(
final GroupByQueryConfig groupByQueryConfig,
final Supplier<ByteBuffer> bufferSupplier,
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
final Supplier<ByteBuffer> combineBufferSupplier,
final KeySerdeFactory<KeyType> keySerdeFactory,
final KeySerdeFactory<KeyType> combineKeySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
Expand Down Expand Up @@ -138,7 +137,7 @@ public ConcurrentGrouper(

ConcurrentGrouper(
final Supplier<ByteBuffer> bufferSupplier,
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
final Supplier<ByteBuffer> combineBufferSupplier,
final KeySerdeFactory<KeyType> keySerdeFactory,
final KeySerdeFactory<KeyType> combineKeySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
Expand Down
Loading