From 1b4e18a19a308a188633192e7cb09defdff2b157 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 2 Feb 2018 09:44:43 -0600 Subject: [PATCH 1/3] Use DefaultBlockingPool for Global Processing Pool instead of StupidPool that can allocate arbitrary number of buffers and cause crashes. --- .../GroupByTypeInterfaceBenchmark.java | 8 ++------ .../benchmark/TopNTypeInterfaceBenchmark.java | 8 +++----- .../benchmark/query/GroupByBenchmark.java | 8 ++------ .../druid/benchmark/query/TopNBenchmark.java | 8 +++----- .../io/druid/collections/BlockingPool.java | 20 +++++++++++++++++++ .../druid/query/GroupByMergedQueryRunner.java | 6 +++--- .../query/groupby/GroupByQueryEngine.java | 8 ++++---- .../query/groupby/GroupByQueryHelper.java | 6 +++--- .../GroupByMergingQueryRunnerV2.java | 7 +++---- .../epinephelinae/GroupByQueryEngineV2.java | 6 +++--- .../groupby/strategy/GroupByStrategyV1.java | 6 +++--- .../groupby/strategy/GroupByStrategyV2.java | 5 ++--- .../AggregateTopNMetricFirstAlgorithm.java | 6 +++--- .../druid/query/topn/PooledTopNAlgorithm.java | 8 ++++---- .../io/druid/query/topn/TopNQueryEngine.java | 6 +++--- .../query/topn/TopNQueryRunnerFactory.java | 6 +++--- .../segment/incremental/IncrementalIndex.java | 4 ++-- .../incremental/OffheapIncrementalIndex.java | 12 +++++------ .../io/druid/guice/DruidProcessingModule.java | 10 +++------- .../druid/guice/RouterProcessingModule.java | 8 +++----- 20 files changed, 78 insertions(+), 78 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java index d4d7d0be468b..4cc749753ea0 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -33,8 +33,6 @@ import io.druid.benchmark.query.QueryBenchmarkUtil; import io.druid.collections.BlockingPool; import io.druid.collections.DefaultBlockingPool; -import io.druid.collections.NonBlockingPool; -import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.hll.HyperLogLogHash; @@ -354,11 +352,9 @@ public void setup() throws IOException } } - NonBlockingPool bufferPool = new StupidPool<>( - "GroupByBenchmark-computeBufferPool", + BlockingPool bufferPool = new DefaultBlockingPool<>( new OffheapBufferGenerator("compute", 250_000_000), - 0, - Integer.MAX_VALUE + 2 ); // limit of 2 is required since we simulate both historical merge and broker merge in the same process diff --git a/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java index 98267a87f0b6..52524ec0fe58 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java @@ -26,7 +26,7 @@ import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.benchmark.query.QueryBenchmarkUtil; -import io.druid.collections.StupidPool; +import io.druid.collections.DefaultBlockingPool; import io.druid.data.input.InputRow; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; @@ -300,11 +300,9 @@ public void setup() throws IOException } factory = new TopNQueryRunnerFactory( - new StupidPool<>( - "TopNBenchmark-compute-bufferPool", + new DefaultBlockingPool<>( new OffheapBufferGenerator("compute", 250000000), - 0, - Integer.MAX_VALUE + 2 ), new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()), QueryBenchmarkUtil.NOOP_QUERYWATCHER diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index 55cfc7a5cfa9..a60c79aeaf6c 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -34,8 +34,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.collections.BlockingPool; import io.druid.collections.DefaultBlockingPool; -import io.druid.collections.NonBlockingPool; -import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.hll.HyperLogLogHash; @@ -448,11 +446,9 @@ public void setup() throws IOException } } - NonBlockingPool bufferPool = new StupidPool<>( - "GroupByBenchmark-computeBufferPool", + BlockingPool bufferPool = new DefaultBlockingPool<>( new OffheapBufferGenerator("compute", 250_000_000), - 0, - Integer.MAX_VALUE + 2 ); // limit of 2 is required since we simulate both historical merge and broker merge in the same process diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java index 5bf7d36b3ed4..4746164fbf92 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java @@ -26,7 +26,7 @@ import io.druid.benchmark.datagen.BenchmarkDataGenerator; import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; -import io.druid.collections.StupidPool; +import io.druid.collections.DefaultBlockingPool; import io.druid.data.input.InputRow; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; @@ -273,11 +273,9 @@ public void setup() throws IOException } factory = new TopNQueryRunnerFactory( - new StupidPool<>( - "TopNBenchmark-compute-bufferPool", + new DefaultBlockingPool<>( new OffheapBufferGenerator("compute", 250000000), - 0, - Integer.MAX_VALUE + 1 ), new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()), QueryBenchmarkUtil.NOOP_QUERYWATCHER diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index fdcf7f2356d4..1a734fdf3056 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -19,6 +19,8 @@ package io.druid.collections; +import io.druid.java.util.common.RE; + import java.util.List; public interface BlockingPool @@ -35,6 +37,24 @@ public interface BlockingPool */ ReferenceCountingResourceHolder take(long timeoutMs); + /** + * Take a resource from the pool, waiting up to the + * specified wait time if necessary for an element to become available. + * + * @param timeoutMs maximum time to wait for a resource, in milliseconds. + * + * @return a resource, or throw RuntimeException on timeout. + */ + default ReferenceCountingResourceHolder takeOrFailOnTimeout(long timeoutMs) + { + ReferenceCountingResourceHolder result = take(timeoutMs); + if (result == null) { + throw new RE("Failed to get buffer in [%s] ms.", timeoutMs); + } else { + return result; + } + } + /** * Take a resource from the pool, waiting if necessary until an element becomes available. * diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index dc90592eb57b..ca4b6b121535 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -29,7 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -60,13 +60,13 @@ public class GroupByMergedQueryRunner implements QueryRunner private final ListeningExecutorService exec; private final Supplier configSupplier; private final QueryWatcher queryWatcher; - private final NonBlockingPool bufferPool; + private final BlockingPool bufferPool; public GroupByMergedQueryRunner( ExecutorService exec, Supplier configSupplier, QueryWatcher queryWatcher, - NonBlockingPool bufferPool, + BlockingPool bufferPool, Iterable> queryables ) { diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index 8191d00b916f..66f0fd67c412 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -27,7 +27,7 @@ import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.inject.Inject; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.collections.ResourceHolder; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -71,12 +71,12 @@ public class GroupByQueryEngine private static final int MISSING_VALUE = -1; private final Supplier config; - private final NonBlockingPool intermediateResultsBufferPool; + private final BlockingPool intermediateResultsBufferPool; @Inject public GroupByQueryEngine( Supplier config, - @Global NonBlockingPool intermediateResultsBufferPool + @Global BlockingPool intermediateResultsBufferPool ) { this.config = config; @@ -107,7 +107,7 @@ public Sequence process(final GroupByQuery query, final StorageAdapter stor null ); - final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); + final ResourceHolder bufferHolder = intermediateResultsBufferPool.takeOrFailOnTimeout(60000); return Sequences.concat( Sequences.withBaggage( diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index f8b070d9e964..5f1a9647b6b3 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -22,7 +22,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.common.guava.GuavaUtils; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedRow; @@ -60,7 +60,7 @@ public class GroupByQueryHelper public static Pair> createIndexAccumulatorPair( final GroupByQuery query, final GroupByQueryConfig config, - NonBlockingPool bufferPool, + BlockingPool bufferPool, final boolean combine ) { @@ -189,7 +189,7 @@ public Queue accumulate(Queue accumulated, T in) public static IncrementalIndex makeIncrementalIndex( GroupByQuery query, GroupByQueryConfig config, - NonBlockingPool bufferPool, + BlockingPool bufferPool, Sequence rows, boolean combine ) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 752820a28e2f..2b90bbc5ca23 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -34,7 +34,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.collections.BlockingPool; -import io.druid.collections.NonBlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.Releaser; import io.druid.collections.ResourceHolder; @@ -83,7 +82,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private final ListeningExecutorService exec; private final QueryWatcher queryWatcher; private final int concurrencyHint; - private final NonBlockingPool processingBufferPool; + private final BlockingPool processingBufferPool; private final BlockingPool mergeBufferPool; private final ObjectMapper spillMapper; private final String processingTmpDir; @@ -95,7 +94,7 @@ public GroupByMergingQueryRunnerV2( QueryWatcher queryWatcher, Iterable> queryables, int concurrencyHint, - NonBlockingPool processingBufferPool, + BlockingPool processingBufferPool, BlockingPool mergeBufferPool, int mergeBufferSize, ObjectMapper spillMapper, @@ -169,7 +168,7 @@ public Sequence run(final QueryPlus queryPlus, final Map get() { if (!initialized) { - buffer = processingBufferPool.take(); + buffer = processingBufferPool.takeOrFailOnTimeout(60000); initialized = true; } return buffer; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 6db22e1efb19..ec611de9e707 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -24,7 +24,7 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.collections.ResourceHolder; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -90,7 +90,7 @@ private GroupByQueryEngineV2() public static Sequence process( final GroupByQuery query, final StorageAdapter storageAdapter, - final NonBlockingPool intermediateResultsBufferPool, + final BlockingPool intermediateResultsBufferPool, final GroupByQueryConfig config ) { @@ -122,7 +122,7 @@ public static Sequence process( return columnCapabilities != null && !columnCapabilities.hasMultipleValues(); }); - final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); + final ResourceHolder bufferHolder = intermediateResultsBufferPool.takeOrFailOnTimeout(60000); final String fudgeTimestampString = Strings.emptyToNull( query.getContextValue(GroupByStrategyV2.CTX_KEY_FUDGE_TIMESTAMP, "") diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 180659999516..0dcb1556638e 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -29,7 +29,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; import io.druid.java.util.common.IAE; @@ -65,14 +65,14 @@ public class GroupByStrategyV1 implements GroupByStrategy private final Supplier configSupplier; private final GroupByQueryEngine engine; private final QueryWatcher queryWatcher; - private final NonBlockingPool bufferPool; + private final BlockingPool bufferPool; @Inject public GroupByStrategyV1( Supplier configSupplier, GroupByQueryEngine engine, QueryWatcher queryWatcher, - @Global NonBlockingPool bufferPool + @Global BlockingPool bufferPool ) { this.configSupplier = configSupplier; diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 5ad75004b760..533912d48596 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -28,7 +28,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import io.druid.collections.BlockingPool; -import io.druid.collections.NonBlockingPool; import io.druid.collections.ResourceHolder; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -81,7 +80,7 @@ public class GroupByStrategyV2 implements GroupByStrategy private final DruidProcessingConfig processingConfig; private final Supplier configSupplier; - private final NonBlockingPool bufferPool; + private final BlockingPool bufferPool; private final BlockingPool mergeBufferPool; private final ObjectMapper spillMapper; private final QueryWatcher queryWatcher; @@ -90,7 +89,7 @@ public class GroupByStrategyV2 implements GroupByStrategy public GroupByStrategyV2( DruidProcessingConfig processingConfig, Supplier configSupplier, - @Global NonBlockingPool bufferPool, + @Global BlockingPool bufferPool, @Merging BlockingPool mergeBufferPool, @Smile ObjectMapper spillMapper, QueryWatcher queryWatcher diff --git a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java index 14ce9694b788..08e86615e186 100644 --- a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java @@ -19,7 +19,7 @@ package io.druid.query.topn; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.query.ColumnSelectorPlus; @@ -41,12 +41,12 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm bufferPool; + private final BlockingPool bufferPool; public AggregateTopNMetricFirstAlgorithm( Capabilities capabilities, TopNQuery query, - NonBlockingPool bufferPool + BlockingPool bufferPool ) { this.capabilities = capabilities; diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index d1bbf954d95c..d17fe32f6950 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -22,7 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.collections.ResourceHolder; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.CloseQuietly; @@ -193,13 +193,13 @@ private static void computeSpecializedScanAndAggregateImplementations() } private final TopNQuery query; - private final NonBlockingPool bufferPool; + private final BlockingPool bufferPool; private static final int AGG_UNROLL_COUNT = 8; // Must be able to fit loop below public PooledTopNAlgorithm( Capabilities capabilities, TopNQuery query, - NonBlockingPool bufferPool + BlockingPool bufferPool ) { super(capabilities); @@ -212,7 +212,7 @@ public PooledTopNParams makeInitParams( ColumnSelectorPlus selectorPlus, Cursor cursor ) { - ResourceHolder resultsBufHolder = bufferPool.take(); + ResourceHolder resultsBufHolder = bufferPool.takeOrFailOnTimeout(60000); ByteBuffer resultsBuf = resultsBufHolder.get(); resultsBuf.clear(); diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java index a43f6b5abe04..58d11e3497bb 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java @@ -22,7 +22,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -49,9 +49,9 @@ public class TopNQueryEngine { - private final NonBlockingPool bufferPool; + private final BlockingPool bufferPool; - public TopNQueryEngine(NonBlockingPool bufferPool) + public TopNQueryEngine(BlockingPool bufferPool) { this.bufferPool = bufferPool; } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index cf8c3aab69b5..3545e32e467b 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -20,7 +20,7 @@ package io.druid.query.topn; import com.google.inject.Inject; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.guice.annotations.Global; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; @@ -41,13 +41,13 @@ */ public class TopNQueryRunnerFactory implements QueryRunnerFactory, TopNQuery> { - private final NonBlockingPool computationBufferPool; + private final BlockingPool computationBufferPool; private final TopNQueryQueryToolChest toolchest; private final QueryWatcher queryWatcher; @Inject public TopNQueryRunnerFactory( - @Global NonBlockingPool computationBufferPool, + @Global BlockingPool computationBufferPool, TopNQueryQueryToolChest toolchest, QueryWatcher queryWatcher ) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 09cb0cfd3dee..d1cfc414fec6 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -29,7 +29,7 @@ import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.common.guava.GuavaUtils; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedRow; @@ -398,7 +398,7 @@ public IncrementalIndex buildOnheap() ); } - public IncrementalIndex buildOffheap(final NonBlockingPool bufferPool) + public IncrementalIndex buildOffheap(final BlockingPool bufferPool) { if (maxRowCount <= 0) { throw new IllegalArgumentException("Invalid max row count: " + maxRowCount); diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 3d45604b69c9..636d65f04a6b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -21,7 +21,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Maps; -import io.druid.collections.NonBlockingPool; +import io.druid.collections.BlockingPool; import io.druid.collections.ResourceHolder; import io.druid.data.input.InputRow; import io.druid.java.util.common.IAE; @@ -47,7 +47,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex { private static final Logger log = new Logger(OffheapIncrementalIndex.class); - private final NonBlockingPool bufferPool; + private final BlockingPool bufferPool; private final List> aggBuffers = new ArrayList<>(); private final List indexAndOffsets = new ArrayList<>(); @@ -75,7 +75,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex boolean concurrentEventAdd, boolean sortFacts, int maxRowCount, - NonBlockingPool bufferPool + BlockingPool bufferPool ) { super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd); @@ -85,8 +85,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) : new PlainFactsHolder(sortFacts); - //check that stupid pool gives buffers that can hold at least one row's aggregators - ResourceHolder bb = bufferPool.take(); + //check that buffer pool gives buffers that can hold at least one row's aggregators + ResourceHolder bb = bufferPool.takeOrFailOnTimeout(60000); if (bb.get().capacity() < aggsTotalSize) { bb.close(); throw new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize); @@ -188,7 +188,7 @@ protected Integer addToFacts( lastBuffer.capacity() - bufferOffset >= aggsTotalSize) { aggBuffer = lastBuffer; } else { - ResourceHolder bb = bufferPool.take(); + ResourceHolder bb = bufferPool.takeOrFailOnTimeout(60000); aggBuffers.add(bb); bufferIndex = aggBuffers.size() - 1; bufferOffset = 0; diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index 5d8f369652d6..3700167af7fd 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -28,8 +28,6 @@ import io.druid.client.cache.CacheConfig; import io.druid.collections.BlockingPool; import io.druid.collections.DefaultBlockingPool; -import io.druid.collections.NonBlockingPool; -import io.druid.collections.StupidPool; import io.druid.common.utils.VMUtils; import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Global; @@ -105,14 +103,12 @@ public ExecutorService getProcessingExecutorService( @Provides @LazySingleton @Global - public NonBlockingPool getIntermediateResultsPool(DruidProcessingConfig config) + public BlockingPool getIntermediateResultsPool(DruidProcessingConfig config) { verifyDirectMemory(config); - return new StupidPool<>( - "intermediate processing pool", + return new DefaultBlockingPool<>( new OffheapBufferGenerator("intermediate processing", config.intermediateComputeSizeBytes()), - config.getNumThreads(), - config.poolCacheMaxCount() + config.getNumThreads() ); } diff --git a/server/src/main/java/io/druid/guice/RouterProcessingModule.java b/server/src/main/java/io/druid/guice/RouterProcessingModule.java index fbc90714bbc8..1ed16c350bcf 100644 --- a/server/src/main/java/io/druid/guice/RouterProcessingModule.java +++ b/server/src/main/java/io/druid/guice/RouterProcessingModule.java @@ -25,13 +25,11 @@ import io.druid.client.cache.CacheConfig; import io.druid.collections.BlockingPool; import io.druid.collections.DummyBlockingPool; -import io.druid.collections.DummyNonBlockingPool; -import io.druid.collections.NonBlockingPool; -import io.druid.java.util.common.concurrent.Execs; import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Merging; import io.druid.guice.annotations.Processing; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.concurrent.ExecutorServiceConfig; import io.druid.java.util.common.logger.Logger; import io.druid.query.DruidProcessingConfig; @@ -86,9 +84,9 @@ public ExecutorService getProcessingExecutorService(DruidProcessingConfig config @Provides @LazySingleton @Global - public NonBlockingPool getIntermediateResultsPool() + public BlockingPool getIntermediateResultsPool() { - return DummyNonBlockingPool.instance(); + return DummyBlockingPool.instance(); } @Provides From fcf39b100f1062f497c21a28e039de458238159f Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 8 Feb 2018 13:11:52 -0600 Subject: [PATCH 2/3] remove OffheapIncrementalIndex --- .../GroupByTypeInterfaceBenchmark.java | 3 +- .../benchmark/query/GroupByBenchmark.java | 3 +- .../druid/query/GroupByMergedQueryRunner.java | 6 - .../query/groupby/GroupByQueryHelper.java | 29 +- .../groupby/strategy/GroupByStrategyV1.java | 13 +- .../segment/incremental/IncrementalIndex.java | 19 - .../incremental/OffheapIncrementalIndex.java | 340 ------------------ .../segment/data/IncrementalIndexTest.java | 59 --- .../incremental/IncrementalIndexTest.java | 31 -- 9 files changed, 11 insertions(+), 492 deletions(-) delete mode 100644 processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 4cc749753ea0..92c7c2dff1d4 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -408,8 +408,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - QueryBenchmarkUtil.NOOP_QUERYWATCHER, - bufferPool + QueryBenchmarkUtil.NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index a60c79aeaf6c..1599c434208f 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -502,8 +502,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - QueryBenchmarkUtil.NOOP_QUERYWATCHER, - bufferPool + QueryBenchmarkUtil.NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index ca4b6b121535..0bb6e67db570 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -29,7 +29,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.collections.BlockingPool; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -42,7 +41,6 @@ import io.druid.query.groupby.GroupByQueryHelper; import io.druid.segment.incremental.IncrementalIndex; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Queue; @@ -60,13 +58,11 @@ public class GroupByMergedQueryRunner implements QueryRunner private final ListeningExecutorService exec; private final Supplier configSupplier; private final QueryWatcher queryWatcher; - private final BlockingPool bufferPool; public GroupByMergedQueryRunner( ExecutorService exec, Supplier configSupplier, QueryWatcher queryWatcher, - BlockingPool bufferPool, Iterable> queryables ) { @@ -74,7 +70,6 @@ public GroupByMergedQueryRunner( this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.configSupplier = configSupplier; - this.bufferPool = bufferPool; } @Override @@ -86,7 +81,6 @@ public Sequence run(final QueryPlus queryPlus, final Map r final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( query, querySpecificConfig, - bufferPool, true ); final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 5f1a9647b6b3..5722b22e17aa 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -22,7 +22,6 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import io.druid.collections.BlockingPool; import io.druid.common.guava.GuavaUtils; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedRow; @@ -47,7 +46,6 @@ import io.druid.segment.incremental.IndexSizeExceededException; import org.joda.time.DateTime; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Queue; @@ -60,7 +58,6 @@ public class GroupByQueryHelper public static Pair> createIndexAccumulatorPair( final GroupByQuery query, final GroupByQueryConfig config, - BlockingPool bufferPool, final boolean combine ) { @@ -118,23 +115,13 @@ public String apply(DimensionSpec input) .withMinTimestamp(granTimeStart.getMillis()) .build(); - if (query.getContextValue("useOffheap", false)) { - index = new IncrementalIndex.Builder() - .setIndexSchema(indexSchema) - .setDeserializeComplexMetrics(false) - .setConcurrentEventAdd(true) - .setSortFacts(sortResults) - .setMaxRowCount(querySpecificConfig.getMaxResults()) - .buildOffheap(bufferPool); - } else { - index = new IncrementalIndex.Builder() - .setIndexSchema(indexSchema) - .setDeserializeComplexMetrics(false) - .setConcurrentEventAdd(true) - .setSortFacts(sortResults) - .setMaxRowCount(querySpecificConfig.getMaxResults()) - .buildOnheap(); - } + index = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setDeserializeComplexMetrics(false) + .setConcurrentEventAdd(true) + .setSortFacts(sortResults) + .setMaxRowCount(querySpecificConfig.getMaxResults()) + .buildOnheap(); Accumulator accumulator = new Accumulator() { @@ -189,7 +176,6 @@ public Queue accumulate(Queue accumulated, T in) public static IncrementalIndex makeIncrementalIndex( GroupByQuery query, GroupByQueryConfig config, - BlockingPool bufferPool, Sequence rows, boolean combine ) @@ -197,7 +183,6 @@ public static IncrementalIndex makeIncrementalIndex( Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( query, config, - bufferPool, combine ); diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 0dcb1556638e..32a42623bb4c 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -29,9 +29,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; -import io.druid.collections.BlockingPool; import io.druid.data.input.Row; -import io.druid.guice.annotations.Global; import io.druid.java.util.common.IAE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -56,7 +54,6 @@ import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.Interval; -import java.nio.ByteBuffer; import java.util.Map; import java.util.Set; @@ -65,20 +62,17 @@ public class GroupByStrategyV1 implements GroupByStrategy private final Supplier configSupplier; private final GroupByQueryEngine engine; private final QueryWatcher queryWatcher; - private final BlockingPool bufferPool; @Inject public GroupByStrategyV1( Supplier configSupplier, GroupByQueryEngine engine, - QueryWatcher queryWatcher, - @Global BlockingPool bufferPool + QueryWatcher queryWatcher ) { this.configSupplier = configSupplier; this.engine = engine; this.queryWatcher = queryWatcher; - this.bufferPool = bufferPool; } @Override @@ -119,7 +113,6 @@ public Sequence mergeResults( final IncrementalIndex index = GroupByQueryHelper.makeIncrementalIndex( query, configSupplier.get(), - bufferPool, baseRunner.run( QueryPlus.wrap( new GroupByQuery.Builder(query) @@ -213,7 +206,6 @@ public boolean apply(AggregatorFactory agg) ) ), configSupplier.get(), - bufferPool, subqueryResult, false ); @@ -225,7 +217,6 @@ public boolean apply(AggregatorFactory agg) final IncrementalIndex outerQueryResultIndex = GroupByQueryHelper.makeIncrementalIndex( outerQuery, configSupplier.get(), - bufferPool, Sequences.concat( Sequences.map( Sequences.simple(outerQuery.getIntervals()), @@ -261,7 +252,7 @@ public QueryRunner mergeRunners( final Iterable> queryRunners ) { - return new GroupByMergedQueryRunner<>(exec, configSupplier, queryWatcher, bufferPool, queryRunners); + return new GroupByMergedQueryRunner<>(exec, configSupplier, queryWatcher, queryRunners); } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index d1cfc414fec6..f5b6df1f34ba 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -29,7 +29,6 @@ import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import io.druid.collections.BlockingPool; import io.druid.common.guava.GuavaUtils; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedRow; @@ -76,7 +75,6 @@ import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; import java.lang.reflect.Array; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -397,23 +395,6 @@ public IncrementalIndex buildOnheap() maxRowCount ); } - - public IncrementalIndex buildOffheap(final BlockingPool bufferPool) - { - if (maxRowCount <= 0) { - throw new IllegalArgumentException("Invalid max row count: " + maxRowCount); - } - - return new OffheapIncrementalIndex( - Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"), - deserializeComplexMetrics, - reportParseExceptions, - concurrentEventAdd, - sortFacts, - maxRowCount, - Objects.requireNonNull(bufferPool, "bufferPool is null") - ); - } } public boolean isRollup() diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java deleted file mode 100644 index 636d65f04a6b..000000000000 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.incremental; - -import com.google.common.base.Supplier; -import com.google.common.collect.Maps; -import io.druid.collections.BlockingPool; -import io.druid.collections.ResourceHolder; -import io.druid.data.input.InputRow; -import io.druid.java.util.common.IAE; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.io.Closer; -import io.druid.java.util.common.logger.Logger; -import io.druid.java.util.common.parsers.ParseException; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.BufferAggregator; -import io.druid.segment.ColumnSelectorFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -/** - */ -public class OffheapIncrementalIndex extends IncrementalIndex -{ - private static final Logger log = new Logger(OffheapIncrementalIndex.class); - - private final BlockingPool bufferPool; - - private final List> aggBuffers = new ArrayList<>(); - private final List indexAndOffsets = new ArrayList<>(); - - private final FactsHolder facts; - - private final AtomicInteger indexIncrement = new AtomicInteger(0); - - protected final int maxRowCount; - - private volatile Map selectors; - - //given a ByteBuffer and an offset where all aggregates for a row are stored - //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate - //is stored - private volatile int[] aggOffsetInBuffer; - private volatile int aggsTotalSize; - - private String outOfRowsReason = null; - - OffheapIncrementalIndex( - IncrementalIndexSchema incrementalIndexSchema, - boolean deserializeComplexMetrics, - boolean reportParseExceptions, - boolean concurrentEventAdd, - boolean sortFacts, - int maxRowCount, - BlockingPool bufferPool - ) - { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd); - this.maxRowCount = maxRowCount; - this.bufferPool = bufferPool; - - this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) - : new PlainFactsHolder(sortFacts); - - //check that buffer pool gives buffers that can hold at least one row's aggregators - ResourceHolder bb = bufferPool.takeOrFailOnTimeout(60000); - if (bb.get().capacity() < aggsTotalSize) { - bb.close(); - throw new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize); - } - aggBuffers.add(bb); - } - - @Override - public FactsHolder getFacts() - { - return facts; - } - - @Override - protected BufferAggregator[] initAggs( - final AggregatorFactory[] metrics, - final Supplier rowSupplier, - final boolean deserializeComplexMetrics, - final boolean concurrentEventAdd - ) - { - selectors = Maps.newHashMap(); - aggOffsetInBuffer = new int[metrics.length]; - - for (int i = 0; i < metrics.length; i++) { - AggregatorFactory agg = metrics[i]; - - ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory( - agg, - rowSupplier, - deserializeComplexMetrics - ); - - selectors.put( - agg.getName(), - new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd) - ); - - if (i == 0) { - aggOffsetInBuffer[i] = 0; - } else { - aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSize(); - } - } - - aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize(); - - return new BufferAggregator[metrics.length]; - } - - @Override - protected Integer addToFacts( - AggregatorFactory[] metrics, - boolean deserializeComplexMetrics, - boolean reportParseExceptions, - InputRow row, - AtomicInteger numEntries, - TimeAndDims key, - ThreadLocal rowContainer, - Supplier rowSupplier, - boolean skipMaxRowsInMemoryCheck // ignored, we always want to check this for offheap - ) throws IndexSizeExceededException - { - ByteBuffer aggBuffer; - int bufferIndex; - int bufferOffset; - - synchronized (this) { - final int priorIndex = facts.getPriorIndex(key); - if (TimeAndDims.EMPTY_ROW_INDEX != priorIndex) { - final int[] indexAndOffset = indexAndOffsets.get(priorIndex); - bufferIndex = indexAndOffset[0]; - bufferOffset = indexAndOffset[1]; - aggBuffer = aggBuffers.get(bufferIndex).get(); - } else { - if (metrics.length > 0 && getAggs()[0] == null) { - // note: creation of Aggregators is done lazily when at least one row from input is available - // so that FilteredAggregators could be initialized correctly. - rowContainer.set(row); - for (int i = 0; i < metrics.length; i++) { - final AggregatorFactory agg = metrics[i]; - getAggs()[i] = agg.factorizeBuffered(selectors.get(agg.getName())); - } - rowContainer.set(null); - } - - bufferIndex = aggBuffers.size() - 1; - ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get(); - int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty() - ? null - : indexAndOffsets.get(indexAndOffsets.size() - 1); - - if (lastAggregatorsIndexAndOffset != null && lastAggregatorsIndexAndOffset[0] != bufferIndex) { - throw new ISE("last row's aggregate's buffer and last buffer index must be same"); - } - - bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0); - if (lastBuffer != null && - lastBuffer.capacity() - bufferOffset >= aggsTotalSize) { - aggBuffer = lastBuffer; - } else { - ResourceHolder bb = bufferPool.takeOrFailOnTimeout(60000); - aggBuffers.add(bb); - bufferIndex = aggBuffers.size() - 1; - bufferOffset = 0; - aggBuffer = bb.get(); - } - - for (int i = 0; i < metrics.length; i++) { - getAggs()[i].init(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); - } - - // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX) { - throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); - } - - final int rowIndex = indexIncrement.getAndIncrement(); - - // note that indexAndOffsets must be updated before facts, because as soon as we update facts - // concurrent readers get hold of it and might ask for newly added row - indexAndOffsets.add(new int[]{bufferIndex, bufferOffset}); - final int prev = facts.putIfAbsent(key, rowIndex); - if (TimeAndDims.EMPTY_ROW_INDEX == prev) { - numEntries.incrementAndGet(); - } else { - throw new ISE("WTF! we are in sychronized block."); - } - } - } - - rowContainer.set(row); - - for (int i = 0; i < metrics.length; i++) { - final BufferAggregator agg = getAggs()[i]; - - synchronized (agg) { - try { - agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); - } - catch (ParseException e) { - // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. - if (reportParseExceptions) { - throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName()); - } else { - log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName()); - } - } - } - } - rowContainer.set(null); - return numEntries.get(); - } - - @Override - public int getLastRowIndex() - { - return indexIncrement.get() - 1; - } - - @Override - public boolean canAppendRow() - { - final boolean canAdd = size() < maxRowCount; - if (!canAdd) { - outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); - } - return canAdd; - } - - @Override - public String getOutOfRowsReason() - { - return outOfRowsReason; - } - - @Override - protected BufferAggregator[] getAggsForRow(int rowOffset) - { - return getAggs(); - } - - @Override - protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) - { - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggPosition]); - } - - @Override - public float getMetricFloatValue(int rowOffset, int aggOffset) - { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.getFloat(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); - } - - @Override - public long getMetricLongValue(int rowOffset, int aggOffset) - { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.getLong(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); - } - - @Override - public Object getMetricObjectValue(int rowOffset, int aggOffset) - { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); - } - - @Override - public double getMetricDoubleValue(int rowOffset, int aggOffset) - { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.getDouble(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); - } - - /** - * NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing - */ - @Override - public void close() - { - super.close(); - facts.clear(); - indexAndOffsets.clear(); - - if (selectors != null) { - selectors.clear(); - } - - Closer c = Closer.create(); - aggBuffers.forEach(c::register); - try { - c.close(); - } - catch (IOException e) { - throw new RuntimeException(e); - } - aggBuffers.clear(); - } -} diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 56b705fa72f1..34c90d809a07 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -19,7 +19,6 @@ package io.druid.segment.data; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -30,7 +29,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionsSpec; @@ -74,7 +72,6 @@ import org.junit.runners.Parameterized; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -126,31 +123,6 @@ public IncrementalIndex createIndex(AggregatorFactory[] factories) } } }, - { - new IndexCreator() - { - @Override - public IncrementalIndex createIndex(AggregatorFactory[] factories) - { - return new IncrementalIndex.Builder() - .setSimpleTestingIndexSchema(factories) - .setMaxRowCount(1000000) - .buildOffheap( - new StupidPool( - "OffheapIncrementalIndex-bufferPool", - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(256 * 1024); - } - } - ) - ); - } - } - }, { new IndexCreator() { @@ -160,38 +132,7 @@ public IncrementalIndex createIndex(AggregatorFactory[] factories) return IncrementalIndexTest.createNoRollupIndex(factories); } } - }, - { - new IndexCreator() - { - @Override - public IncrementalIndex createIndex(AggregatorFactory[] factories) - { - return new IncrementalIndex.Builder() - .setIndexSchema( - new IncrementalIndexSchema.Builder() - .withMetrics(factories) - .withRollup(false) - .build() - ) - .setMaxRowCount(1000000) - .buildOffheap( - new StupidPool( - "OffheapIncrementalIndex-bufferPool", - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(256 * 1024); - } - } - ) - ); - } - } } - } ); } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index ee728cb51605..5d959bb41bbb 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -19,11 +19,9 @@ package io.druid.segment.incremental; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionSchema; @@ -44,7 +42,6 @@ import org.junit.runners.Parameterized; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -112,34 +109,6 @@ public IncrementalIndex createIndex() } } ); - constructors.add( - new Object[]{ - new IndexCreator() - { - @Override - public IncrementalIndex createIndex() - { - return new IncrementalIndex.Builder() - .setIndexSchema(schema) - .setSortFacts(sortFacts) - .setMaxRowCount(1000000) - .buildOffheap( - new StupidPool( - "OffheapIncrementalIndex-bufferPool", - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(256 * 1024); - } - } - ) - ); - } - } - } - ); } return constructors; From a5ecf6172dab421a6b5a9c1b6ec03bc62358143f Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 9 Feb 2018 09:48:29 -0600 Subject: [PATCH 3/3] dont use processing buffer in ParallelCombiner --- .../epinephelinae/ConcurrentGrouper.java | 5 +-- .../GroupByMergingQueryRunnerV2.java | 38 ++++++------------- .../epinephelinae/ParallelCombiner.java | 10 ++--- .../epinephelinae/RowBasedGrouperHelper.java | 3 +- .../groupby/strategy/GroupByStrategyV2.java | 1 - 5 files changed, 18 insertions(+), 39 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index de223f3894d8..12e361cf1104 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.collections.ResourceHolder; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.ISE; import io.druid.java.util.common.parsers.CloseableIterator; @@ -96,7 +95,7 @@ public class ConcurrentGrouper implements Grouper public ConcurrentGrouper( final GroupByQueryConfig groupByQueryConfig, final Supplier bufferSupplier, - final Supplier> combineBufferSupplier, + final Supplier combineBufferSupplier, final KeySerdeFactory keySerdeFactory, final KeySerdeFactory combineKeySerdeFactory, final ColumnSelectorFactory columnSelectorFactory, @@ -138,7 +137,7 @@ public ConcurrentGrouper( ConcurrentGrouper( final Supplier bufferSupplier, - final Supplier> combineBufferSupplier, + final Supplier combineBufferSupplier, final KeySerdeFactory keySerdeFactory, final KeySerdeFactory combineKeySerdeFactory, final ColumnSelectorFactory columnSelectorFactory, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 2b90bbc5ca23..624a008186ae 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -82,7 +82,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private final ListeningExecutorService exec; private final QueryWatcher queryWatcher; private final int concurrencyHint; - private final BlockingPool processingBufferPool; private final BlockingPool mergeBufferPool; private final ObjectMapper spillMapper; private final String processingTmpDir; @@ -94,7 +93,6 @@ public GroupByMergingQueryRunnerV2( QueryWatcher queryWatcher, Iterable> queryables, int concurrencyHint, - BlockingPool processingBufferPool, BlockingPool mergeBufferPool, int mergeBufferSize, ObjectMapper spillMapper, @@ -106,7 +104,6 @@ public GroupByMergingQueryRunnerV2( this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.concurrencyHint = concurrencyHint; - this.processingBufferPool = processingBufferPool; this.mergeBufferPool = mergeBufferPool; this.spillMapper = spillMapper; this.processingTmpDir = processingTmpDir; @@ -159,22 +156,6 @@ public Sequence run(final QueryPlus queryPlus, final Map> combineBufferSupplier = new Supplier>() - { - private boolean initialized; - private ResourceHolder buffer; - - @Override - public ResourceHolder get() - { - if (!initialized) { - buffer = processingBufferPool.takeOrFailOnTimeout(60000); - initialized = true; - } - return buffer; - } - }; - return new BaseSequence<>( new BaseSequence.IteratorMaker>() { @@ -192,18 +173,23 @@ public CloseableGrouperIterator make() ReferenceCountingResourceHolder.fromCloseable(temporaryStorage); resources.add(temporaryStorageHolder); - final ReferenceCountingResourceHolder mergeBufferHolder; + final ReferenceCountingResourceHolder> buffers; + + // We need at least one buffer for merging and might need additional buffer to combine results + // in ParallelCombiner if configured. + final int buffersCount = config.getNumParallelCombineThreads() > 1 ? 2 : 1; + try { // This will potentially block if there are no merge buffers left in the pool. if (hasTimeout) { final long timeout = timeoutAt - System.currentTimeMillis(); - if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { + if (timeout <= 0 || (buffers = mergeBufferPool.takeBatch(buffersCount, timeout)) == null) { throw new TimeoutException(); } } else { - mergeBufferHolder = mergeBufferPool.take(); + buffers = mergeBufferPool.takeBatch(buffersCount); } - resources.add(mergeBufferHolder); + resources.add(buffers); } catch (Exception e) { throw new QueryInterruptedException(e); @@ -214,8 +200,8 @@ public CloseableGrouperIterator make() false, null, config, - Suppliers.ofInstance(mergeBufferHolder.get()), - combineBufferSupplier, + Suppliers.ofInstance(buffers.get().get(0)), + buffersCount == 2 ? Suppliers.ofInstance(buffers.get().get(1)) : null, concurrencyHint, temporaryStorage, spillMapper, @@ -256,7 +242,7 @@ public ListenableFuture apply(final QueryRunner input) public AggregateResult call() throws Exception { try ( - Releaser bufferReleaser = mergeBufferHolder.increment(); + Releaser bufferReleaser = buffers.increment(); Releaser grouperReleaser = grouperHolder.increment() ) { final AggregateResult retVal = input.run(queryPlusForRunners, responseContext) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java index d043a9070d76..2037ed40f793 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java @@ -26,7 +26,6 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.collections.ResourceHolder; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -84,7 +83,7 @@ public class ParallelCombiner // details. private static final int MINIMUM_LEAF_COMBINE_DEGREE = 2; - private final Supplier> combineBufferSupplier; + private final Supplier combineBufferSupplier; private final AggregatorFactory[] combiningFactories; private final KeySerdeFactory combineKeySerdeFactory; private final ListeningExecutorService executor; @@ -98,7 +97,7 @@ public class ParallelCombiner private final int intermediateCombineDegree; public ParallelCombiner( - Supplier> combineBufferSupplier, + Supplier combineBufferSupplier, AggregatorFactory[] combiningFactories, KeySerdeFactory combineKeySerdeFactory, ListeningExecutorService executor, @@ -135,9 +134,7 @@ public CloseableIterator> combine( List mergedDictionary ) { - // CombineBuffer is initialized when this method is called and closed after the result iterator is done - final ResourceHolder combineBufferHolder = combineBufferSupplier.get(); - final ByteBuffer combineBuffer = combineBufferHolder.get(); + final ByteBuffer combineBuffer = combineBufferSupplier.get(); final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity( combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary), combiningFactories @@ -170,7 +167,6 @@ public CloseableIterator> combine( final List combineFutures = combineIteratorAndFutures.rhs; final Closer closer = Closer.create(); - closer.register(combineBufferHolder); closer.register(() -> checkCombineFutures(combineFutures)); return CloseableIterators.wrap(combineIterator, closer); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 290cf13753bf..078d9c71e755 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -33,7 +33,6 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.collections.ResourceHolder; import io.druid.common.utils.IntArrayUtils; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -140,7 +139,7 @@ public static Pair, Accumulator> crea final Map rawInputRowSignature, final GroupByQueryConfig config, final Supplier bufferSupplier, - final Supplier> combineBufferSupplier, + final Supplier combineBufferSupplier, final int concurrencyHint, final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 533912d48596..69c28fab5972 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -337,7 +337,6 @@ public QueryRunner mergeRunners( queryWatcher, queryRunners, processingConfig.getNumThreads(), - bufferPool, mergeBufferPool, processingConfig.intermediateComputeSizeBytes(), spillMapper,