diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 19fe385a2926..445068927534 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -393,8 +393,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - QueryBenchmarkUtil.NOOP_QUERYWATCHER, - bufferPool + QueryBenchmarkUtil.NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 36512ad42848..b85dad30a7d3 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -371,8 +371,7 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory( new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - bufferPool + QueryRunnerTestHelper.NOOP_QUERYWATCHER ), new GroupByStrategyV2( processingConfig, diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java index bbf98fd7ff54..481bc4515997 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java @@ -508,8 +508,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - QueryBenchmarkUtil.NOOP_QUERYWATCHER, - bufferPool + QueryBenchmarkUtil.NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c20d801cf592..6e79c1be0764 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -2090,9 +2090,9 @@ Supported query contexts: |Key|Description|Default| |---|-----------|-------| -|`maxIntermediateRows`|Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for this query.|None| -|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None| -|`useOffheap`|Set to true to store aggregations off-heap when merging results.|false| +|`maxIntermediateRows`|Ignored by groupBy v2. Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for a groupBy v1 query.|None| +|`maxResults`|Ignored by groupBy v2. Can be used to lower the value of `druid.query.groupBy.maxResults` for a groupBy v1 query.|None| +|`useOffheap`|Ignored by groupBy v2, and no longer supported for groupBy v1. Enabling this option with groupBy v1 will result in an error. For off-heap aggregation, switch to groupBy v2, which always operates off-heap.|false| ### Router diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md index 0b66a5f161b9..6128b2bcbc78 100644 --- a/docs/querying/groupbyquery.md +++ b/docs/querying/groupbyquery.md @@ -451,9 +451,9 @@ Supported query contexts: |Key|Description|Default| |---|-----------|-------| -|`maxIntermediateRows`|Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for this query.|None| -|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None| -|`useOffheap`|Set to true to store aggregations off-heap when merging results.|false| +|`maxIntermediateRows`|Ignored by groupBy v2. Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for a groupBy v1 query.|None| +|`maxResults`|Ignored by groupBy v2. Can be used to lower the value of `druid.query.groupBy.maxResults` for a groupBy v1 query.|None| +|`useOffheap`|Ignored by groupBy v2, and no longer supported for groupBy v1. Enabling this option with groupBy v1 will result in an error. For off-heap aggregation, switch to groupBy v2, which always operates off-heap.|false| #### Array based result rows diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java index ae991c1856db..3e8122eeb753 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java @@ -19,7 +19,6 @@ package org.apache.druid.query.aggregation.datasketches.tuple; -import com.google.common.util.concurrent.Striped; import org.apache.datasketches.memory.WritableMemory; import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketches; import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketch; @@ -31,12 +30,9 @@ import org.apache.druid.segment.data.IndexedInts; import javax.annotation.Nullable; - import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; /** * This aggregator builds sketches from raw data. @@ -45,16 +41,12 @@ */ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregator { - - private static final int NUM_STRIPES = 64; // for locking per buffer position (power of 2 to make index computation faster) - private final DimensionSelector keySelector; private final BaseDoubleColumnValueSelector[] valueSelectors; private final int nominalEntries; private final int maxIntermediateSize; @Nullable private double[] values; // not part of the state, but to reuse in aggregate() method - private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES); public ArrayOfDoublesSketchBuildBufferAggregator( final DimensionSelector keySelector, @@ -76,15 +68,10 @@ public void init(final ByteBuffer buf, final int position) final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries) - .setNumberOfValues(valueSelectors.length) - .setNumberOfValues(valueSelectors.length).build(region); + .setNumberOfValues(valueSelectors.length) + .setNumberOfValues(valueSelectors.length).build(region); } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * https://github.com/apache/druid/pull/3956 - */ @Override public void aggregate(final ByteBuffer buf, final int position) { @@ -101,24 +88,14 @@ public void aggregate(final ByteBuffer buf, final int position) // might might be considered, but it would increase complexity including relocate() support. final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); - final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock(); - lock.lock(); - try { - final ArrayOfDoublesUpdatableSketch sketch = ArrayOfDoublesSketches.wrapUpdatableSketch(region); - for (int i = 0, keysSize = keys.size(); i < keysSize; i++) { - final String key = keySelector.lookupName(keys.get(i)); - sketch.update(key, values); - } - } - finally { - lock.unlock(); + final ArrayOfDoublesUpdatableSketch sketch = ArrayOfDoublesSketches.wrapUpdatableSketch(region); + for (int i = 0, keysSize = keys.size(); i < keysSize; i++) { + final String key = keySelector.lookupName(keys.get(i)); + sketch.update(key, values); } } /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * https://github.com/apache/druid/pull/3956 * The returned sketch is a separate instance of ArrayOfDoublesCompactSketch * representing the current state of the aggregation, and is not affected by consequent * aggregate() calls @@ -128,16 +105,9 @@ public Object get(final ByteBuffer buf, final int position) { final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); - final Lock lock = stripedLock.getAt(lockIndex(position)).readLock(); - lock.lock(); - try { - final ArrayOfDoublesUpdatableSketch sketch = (ArrayOfDoublesUpdatableSketch) ArrayOfDoublesSketches - .wrapSketch(region); - return sketch.compact(); - } - finally { - lock.unlock(); - } + final ArrayOfDoublesUpdatableSketch sketch = (ArrayOfDoublesUpdatableSketch) ArrayOfDoublesSketches + .wrapSketch(region); + return sketch.compact(); } @Override @@ -164,18 +134,4 @@ public void inspectRuntimeShape(final RuntimeShapeInspector inspector) inspector.visit("keySelector", keySelector); inspector.visit("valueSelectors", valueSelectors); } - - // compute lock index to avoid boxing in Striped.get() call - static int lockIndex(final int position) - { - return smear(position) % NUM_STRIPES; - } - - // from https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548 - private static int smear(int hashCode) - { - hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); - return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); - } - } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java index eb5ee6d2b820..a2daba10f313 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java @@ -19,7 +19,6 @@ package org.apache.druid.query.aggregation.datasketches.tuple; -import com.google.common.util.concurrent.Striped; import org.apache.datasketches.memory.WritableMemory; import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSetOperationBuilder; import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch; @@ -31,8 +30,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; /** * This aggregator merges existing sketches. @@ -41,14 +38,10 @@ */ public class ArrayOfDoublesSketchMergeBufferAggregator implements BufferAggregator { - - private static final int NUM_STRIPES = 64; // for locking per buffer position - private final BaseObjectColumnValueSelector selector; private final int nominalEntries; private final int numberOfValues; private final int maxIntermediateSize; - private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES); public ArrayOfDoublesSketchMergeBufferAggregator( final BaseObjectColumnValueSelector selector, @@ -69,14 +62,9 @@ public void init(final ByteBuffer buf, final int position) final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); new ArrayOfDoublesSetOperationBuilder().setNominalEntries(nominalEntries) - .setNumberOfValues(numberOfValues).buildUnion(region); + .setNumberOfValues(numberOfValues).buildUnion(region); } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * https://github.com/apache/druid/pull/3956 - */ @Override public void aggregate(final ByteBuffer buf, final int position) { @@ -86,18 +74,11 @@ public void aggregate(final ByteBuffer buf, final int position) } // Wrapping memory and ArrayOfDoublesUnion is inexpensive compared to union operations. // Maintaining a cache of wrapped objects per buffer position like in Theta sketch aggregator - // might might be considered, but it would increase complexity including relocate() support. + // might be considered, but it would increase complexity including relocate() support. final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); - final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).writeLock(); - lock.lock(); - try { - final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region); - union.union(update); - } - finally { - lock.unlock(); - } + final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region); + union.union(update); } /** @@ -113,15 +94,8 @@ public Object get(final ByteBuffer buf, final int position) { final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN); final WritableMemory region = mem.writableRegion(position, maxIntermediateSize); - final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).readLock(); - lock.lock(); - try { - final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region); - return union.getResult(); - } - finally { - lock.unlock(); - } + final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region); + return union.getResult(); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java index 5b7d550a027d..d79aad99ef67 100644 --- a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java @@ -28,7 +28,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.ISE; @@ -44,7 +43,6 @@ import org.apache.druid.query.groupby.GroupByQueryHelper; import org.apache.druid.segment.incremental.IncrementalIndex; -import java.nio.ByteBuffer; import java.util.List; import java.util.Queue; import java.util.concurrent.CancellationException; @@ -59,14 +57,12 @@ public class GroupByMergedQueryRunner implements QueryRunner private final Iterable> queryables; private final Supplier configSupplier; private final QueryWatcher queryWatcher; - private final NonBlockingPool bufferPool; private final QueryProcessingPool queryProcessingPool; public GroupByMergedQueryRunner( QueryProcessingPool queryProcessingPool, Supplier configSupplier, QueryWatcher queryWatcher, - NonBlockingPool bufferPool, Iterable> queryables ) { @@ -74,7 +70,6 @@ public GroupByMergedQueryRunner( this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.configSupplier = configSupplier; - this.bufferPool = bufferPool; } @Override @@ -86,8 +81,7 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext respo final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( query, null, - querySpecificConfig, - bufferPool + querySpecificConfig ); final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); final boolean bySegment = QueryContexts.isBySegment(query); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java index a41263e77924..fe05e4759dca 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java @@ -32,6 +32,10 @@ * Thus, an Aggregator can be thought of as a closure over some other thing that is stateful and changes between calls * to aggregate(). This is currently (as of this documentation) implemented through the use of {@link * org.apache.druid.segment.ColumnValueSelector} objects. + * + * During ingestion, {@link org.apache.druid.segment.incremental.OnheapIncrementalIndex} uses instances of this class + * from multiple threads. In particular, ingestion threads call {@link #aggregate()} simultaneously with query threads + * calling the various "get" methods. Instances of this class must be thread-safe. */ @ExtensionPoint public interface Aggregator extends Closeable diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java index b047f0ee03c5..c990184b8f63 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java @@ -35,6 +35,9 @@ * Thus, an Aggregator can be thought of as a closure over some other thing that is stateful and changes between calls * to aggregate(...). * + * Unlike {@link Aggregator}, BufferAggregators are never used by multiple threads at once. Implementations are not + * required to be thread safe. + * * @see VectorAggregator, the vectorized version */ @ExtensionPoint @@ -108,6 +111,7 @@ public interface BufferAggregator extends HotLoopCallee * @param position offset within the byte buffer at which the aggregate value is stored * @return the float representation of the aggregate */ + @SuppressWarnings("unused") // Unused today, but may be used in the future by https://github.com/apache/druid/pull/10001 float getFloat(ByteBuffer buf, int position); /** @@ -125,6 +129,7 @@ public interface BufferAggregator extends HotLoopCallee * @param position offset within the byte buffer at which the aggregate value is stored * @return the long representation of the aggregate */ + @SuppressWarnings("unused") // Unused today, but may be used in the future by https://github.com/apache/druid/pull/10001 long getLong(ByteBuffer buf, int position); /** @@ -146,6 +151,7 @@ public interface BufferAggregator extends HotLoopCallee * @param position offset within the byte buffer at which the aggregate value is stored * @return the double representation of the aggregate */ + @SuppressWarnings("unused") // Unused today, but may be used in the future by https://github.com/apache/druid/pull/10001 default double getDouble(ByteBuffer buf, int position) { return (double) getFloat(buf, position); @@ -204,6 +210,7 @@ default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, By * * @return true if the aggregated value is primitive long/double/float and aggregated value is null otherwise false. */ + @SuppressWarnings("unused") // Unused today, but may be used in the future by https://github.com/apache/druid/pull/10001 default boolean isNull(ByteBuffer buf, int position) { return false; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java index 8d0983eeba63..befff12ba6e0 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java @@ -31,6 +31,9 @@ * {@link org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop} because vectorized query engines do not use * monomorphic-processing-style specialization. * + * Unlike {@link Aggregator}, VectorAggregators are never used by multiple threads at once. Implementations are not + * required to be thread safe. + * * @see BufferAggregator, the vectorized version. */ public interface VectorAggregator diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java index 332e020f095c..8a7b05096c35 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java @@ -21,7 +21,6 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; -import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.data.input.Row; @@ -43,11 +42,9 @@ import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IndexSizeExceededException; -import org.apache.druid.segment.incremental.OffheapIncrementalIndex; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import javax.annotation.Nullable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -60,8 +57,7 @@ public class GroupByQueryHelper public static Pair> createIndexAccumulatorPair( final GroupByQuery query, @Nullable final GroupByQuery subquery, - final GroupByQueryConfig config, - NonBlockingPool bufferPool + final GroupByQueryConfig config ) { final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); @@ -123,8 +119,11 @@ public String apply(DimensionSpec input) final AppendableIndexBuilder indexBuilder; if (query.getContextValue("useOffheap", false)) { - indexBuilder = new OffheapIncrementalIndex.Builder() - .setBufferPool(bufferPool); + throw new UnsupportedOperationException( + "The 'useOffheap' option is no longer available for groupBy v1. Please move to the newer groupBy engine, " + + "which always operates off-heap, by removing any custom 'druid.query.groupBy.defaultStrategy' runtime " + + "properties and 'groupByStrategy' query context parameters that you have set." + ); } else { indexBuilder = new OnheapIncrementalIndex.Builder(); } @@ -196,12 +195,11 @@ public static IncrementalIndex makeIncrementalIndex( GroupByQuery query, @Nullable GroupByQuery subquery, GroupByQueryConfig config, - NonBlockingPool bufferPool, Sequence rows ) { final Pair> indexAccumulatorPair = - GroupByQueryHelper.createIndexAccumulatorPair(query, subquery, config, bufferPool); + GroupByQueryHelper.createIndexAccumulatorPair(query, subquery, config); return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java index 72eafed7c930..c25335f7ce7e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.inject.Inject; -import org.apache.druid.collections.NonBlockingPool; -import org.apache.druid.guice.annotations.Global; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -53,7 +51,6 @@ import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.Interval; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -63,20 +60,17 @@ public class GroupByStrategyV1 implements GroupByStrategy private final Supplier configSupplier; private final GroupByQueryEngine engine; private final QueryWatcher queryWatcher; - private final NonBlockingPool bufferPool; @Inject public GroupByStrategyV1( Supplier configSupplier, GroupByQueryEngine engine, - QueryWatcher queryWatcher, - @Global NonBlockingPool bufferPool + QueryWatcher queryWatcher ) { this.configSupplier = configSupplier; this.engine = engine; this.queryWatcher = queryWatcher; - this.bufferPool = bufferPool; } @Override @@ -108,7 +102,6 @@ public Sequence mergeResults( query, null, configSupplier.get(), - bufferPool, baseRunner.run( QueryPlus.wrap( new GroupByQuery.Builder(query) @@ -217,7 +210,6 @@ public boolean apply(AggregatorFactory agg) ), subquery, configSupplier.get(), - bufferPool, subqueryResult ); @@ -229,7 +221,6 @@ public boolean apply(AggregatorFactory agg) outerQuery, null, configSupplier.get(), - bufferPool, Sequences.concat( Sequences.map( Sequences.simple(outerQuery.getIntervals()), @@ -274,7 +265,7 @@ public QueryRunner mergeRunners( final Iterable> queryRunners ) { - return new GroupByMergedQueryRunner<>(queryProcessingPool, configSupplier, queryWatcher, bufferPool, queryRunners); + return new GroupByMergedQueryRunner<>(queryProcessingPool, configSupplier, queryWatcher, queryRunners); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 3621b8222a17..00677b1b10cf 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -348,7 +348,17 @@ protected abstract AddToFactsResult addToFacts( protected abstract AggregatorType[] getAggsForRow(int rowOffset); - protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition); + /** + * Return the current aggregated value for a particular aggregator, row number, and aggregator number. + * + * Note: rowOffset and aggPosition are unused today, but may be used in the future by + * https://github.com/apache/druid/pull/10001. + */ + protected abstract Object getAggVal( + AggregatorType agg, + @SuppressWarnings("unused") int rowOffset, + @SuppressWarnings("unused") int aggPosition + ); protected abstract float getMetricFloatValue(int rowOffset, int aggOffset); @@ -691,6 +701,7 @@ private long getMaxTimeMillis() return getFacts().getMaxTimeMillis(); } + @SuppressWarnings("unused") // Unused today, but may be used in the future by https://github.com/apache/druid/pull/10001 public AggregatorType[] getAggs() { return aggs; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java deleted file mode 100644 index a74f94fdb827..000000000000 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java +++ /dev/null @@ -1,391 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment.incremental; - -import com.google.common.base.Supplier; -import org.apache.druid.collections.NonBlockingPool; -import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.segment.ColumnSelectorFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * - */ -public class OffheapIncrementalIndex extends IncrementalIndex -{ - private static final Logger log = new Logger(OffheapIncrementalIndex.class); - - private final NonBlockingPool bufferPool; - - private final List> aggBuffers = new ArrayList<>(); - private final List indexAndOffsets = new ArrayList<>(); - - private final FactsHolder facts; - - private final AtomicInteger indexIncrement = new AtomicInteger(0); - - protected final int maxRowCount; - - @Nullable - private volatile Map selectors; - - //given a ByteBuffer and an offset where all aggregates for a row are stored - //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate - //is stored - @Nullable - private volatile int[] aggOffsetInBuffer; - private volatile int aggsTotalSize; - - @Nullable - private String outOfRowsReason = null; - - OffheapIncrementalIndex( - IncrementalIndexSchema incrementalIndexSchema, - boolean deserializeComplexMetrics, - boolean concurrentEventAdd, - boolean sortFacts, - int maxRowCount, - NonBlockingPool bufferPool - ) - { - super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd); - this.maxRowCount = maxRowCount; - this.bufferPool = bufferPool; - - this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) - : new PlainFactsHolder(sortFacts, dimsComparator()); - - //check that stupid pool gives buffers that can hold at least one row's aggregators - ResourceHolder bb = bufferPool.take(); - if (bb.get().capacity() < aggsTotalSize) { - bb.close(); - throw new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize); - } - aggBuffers.add(bb); - } - - @Override - public FactsHolder getFacts() - { - return facts; - } - - @Override - protected BufferAggregator[] initAggs( - final AggregatorFactory[] metrics, - final Supplier rowSupplier, - final boolean deserializeComplexMetrics, - final boolean concurrentEventAdd - ) - { - selectors = new HashMap<>(); - aggOffsetInBuffer = new int[metrics.length]; - - int aggsCurOffsetInBuffer = 0; - - for (int i = 0; i < metrics.length; i++) { - AggregatorFactory agg = metrics[i]; - - ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory( - agg, - rowSupplier, - deserializeComplexMetrics - ); - - selectors.put( - agg.getName(), - new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd) - ); - - aggOffsetInBuffer[i] = aggsCurOffsetInBuffer; - aggsCurOffsetInBuffer += agg.getMaxIntermediateSizeWithNulls(); - } - - aggsTotalSize = aggsCurOffsetInBuffer; - - return new BufferAggregator[metrics.length]; - } - - @Override - protected AddToFactsResult addToFacts( - InputRow row, - IncrementalIndexRow key, - ThreadLocal rowContainer, - Supplier rowSupplier, - boolean skipMaxRowsInMemoryCheck // ignored, we always want to check this for offheap - ) throws IndexSizeExceededException - { - synchronized (this) { - final AggregatorFactory[] metrics = getMetrics(); - final int priorIndex = facts.getPriorIndex(key); - if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) { - final int[] indexAndOffset = indexAndOffsets.get(priorIndex); - ByteBuffer aggBuffer = aggBuffers.get(indexAndOffset[0]).get(); - return aggregate(row, rowContainer, aggBuffer, indexAndOffset[1]); - } else { - if (metrics.length > 0 && getAggs()[0] == null) { - // note: creation of Aggregators is done lazily when at least one row from input is available - // so that FilteredAggregators could be initialized correctly. - rowContainer.set(row); - for (int i = 0; i < metrics.length; i++) { - final AggregatorFactory agg = metrics[i]; - getAggs()[i] = agg.factorizeBuffered(selectors.get(agg.getName())); - } - rowContainer.set(null); - } - - int bufferIndex = aggBuffers.size() - 1; - ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get(); - int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty() - ? null - : indexAndOffsets.get(indexAndOffsets.size() - 1); - - if (lastAggregatorsIndexAndOffset != null && lastAggregatorsIndexAndOffset[0] != bufferIndex) { - throw new ISE("last row's aggregate's buffer and last buffer index must be same"); - } - - int bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0); - ByteBuffer aggBuffer; - if (lastBuffer != null && - lastBuffer.capacity() - bufferOffset >= aggsTotalSize) { - aggBuffer = lastBuffer; - } else { - ResourceHolder bb = bufferPool.take(); - aggBuffers.add(bb); - bufferIndex = aggBuffers.size() - 1; - bufferOffset = 0; - aggBuffer = bb.get(); - } - - for (int i = 0; i < metrics.length; i++) { - getAggs()[i].init(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); - } - - // Last ditch sanity checks - if (getNumEntries().get() >= maxRowCount && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) { - throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); - } - - final int rowIndex = indexIncrement.getAndIncrement(); - - // note that we must update indexAndOffsets and the aggregator's buffers before facts, because as soon as we - // update facts concurrent readers get hold of it and might ask for newly added row - AddToFactsResult res = aggregate(row, rowContainer, aggBuffer, bufferOffset); - indexAndOffsets.add(new int[]{bufferIndex, bufferOffset}); - final int prev = facts.putIfAbsent(key, rowIndex); - if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) { - getNumEntries().incrementAndGet(); - } else { - throw new ISE("Unexpected state: Concurrent fact addition."); - } - - return res; - } - } - } - - public AddToFactsResult aggregate( - InputRow row, - ThreadLocal rowContainer, - ByteBuffer aggBuffer, - int bufferOffset - ) - { - final List parseExceptionMessages = new ArrayList<>(); - - rowContainer.set(row); - for (int i = 0; i < getMetrics().length; i++) { - final BufferAggregator agg = getAggs()[i]; - - synchronized (agg) { - try { - agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); - } - catch (ParseException e) { - // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. - log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName()); - parseExceptionMessages.add(e.getMessage()); - } - } - } - rowContainer.set(null); - - return new AddToFactsResult(getNumEntries().get(), 0, parseExceptionMessages); - } - - - @Override - public int getLastRowIndex() - { - return indexIncrement.get() - 1; - } - - @Override - public boolean canAppendRow() - { - final boolean canAdd = size() < maxRowCount; - if (!canAdd) { - outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); - } - return canAdd; - } - - @Override - public String getOutOfRowsReason() - { - return outOfRowsReason; - } - - @Override - protected BufferAggregator[] getAggsForRow(int rowOffset) - { - return getAggs(); - } - - @Override - protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) - { - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggPosition]); - } - - @Override - public float getMetricFloatValue(int rowOffset, int aggOffset) - { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.getFloat(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); - } - - @Override - public long getMetricLongValue(int rowOffset, int aggOffset) - { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.getLong(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); - } - - @Override - public Object getMetricObjectValue(int rowOffset, int aggOffset) - { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); - } - - @Override - public double getMetricDoubleValue(int rowOffset, int aggOffset) - { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.getDouble(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); - } - - @Override - public boolean isNull(int rowOffset, int aggOffset) - { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.isNull(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); - } - - /** - * NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing - */ - @Override - public void close() - { - super.close(); - facts.clear(); - indexAndOffsets.clear(); - - if (selectors != null) { - selectors.clear(); - } - - Closer c = Closer.create(); - aggBuffers.forEach(c::register); - try { - c.close(); - } - catch (IOException e) { - throw new RuntimeException(e); - } - aggBuffers.clear(); - } - - public static class Builder extends AppendableIndexBuilder - { - @Nullable - NonBlockingPool bufferPool = null; - - public Builder setBufferPool(final NonBlockingPool bufferPool) - { - this.bufferPool = bufferPool; - return this; - } - - @Override - public void validate() - { - super.validate(); - if (bufferPool == null) { - throw new IllegalArgumentException("bufferPool cannot be null"); - } - } - - @Override - protected OffheapIncrementalIndex buildInner() - { - return new OffheapIncrementalIndex( - Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"), - deserializeComplexMetrics, - concurrentEventAdd, - sortFacts, - maxRowCount, - Objects.requireNonNull(bufferPool, "bufferPool is null") - ); - } - } -} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java index f22701d8f69f..36e6b2c0272a 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java @@ -357,8 +357,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - NOOP_QUERYWATCHER, - bufferPool + NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, @@ -375,8 +374,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - NOOP_QUERYWATCHER, - bufferPool + NOOP_QUERYWATCHER ), new GroupByStrategyV2( tooSmallDruidProcessingConfig, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java index 08365d50b4b9..ddd9936c1cf5 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java @@ -607,8 +607,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - NOOP_QUERYWATCHER, - bufferPool + NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, @@ -625,8 +624,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - NOOP_QUERYWATCHER, - bufferPool + NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java index 7b73cc149b59..d7ab55ade4cd 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java @@ -268,8 +268,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - NOOP_QUERYWATCHER, - bufferPool + NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java index faed09af603e..3e9a630a1724 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -132,8 +132,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, BUFFER_POOL), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - BUFFER_POOL + QueryRunnerTestHelper.NOOP_QUERYWATCHER ), new GroupByStrategyV2( PROCESSING_CONFIG, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java index f98785231f39..2dfe08e6ce33 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -102,8 +102,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, BUFFER_POOL), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - BUFFER_POOL + QueryRunnerTestHelper.NOOP_QUERYWATCHER ), new GroupByStrategyV2( DEFAULT_PROCESSING_CONFIG, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 2401adc8a11e..5b3d3f1c5171 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -392,8 +392,7 @@ public ByteBuffer get() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - bufferPool + QueryRunnerTestHelper.NOOP_QUERYWATCHER ), new GroupByStrategyV2( processingConfig, @@ -10965,7 +10964,7 @@ public void testTypeConversionWithMergingChainedExecutionRunner() (query1, future) -> { return; }, - ImmutableList.>of(runner, runner) + ImmutableList.of(runner, runner) ); QueryRunner mergingRunner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(ceqr)); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java index 00b55ad5e2ff..ba1a34456028 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java @@ -315,8 +315,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - NOOP_QUERYWATCHER, - bufferPool + NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, @@ -333,8 +332,7 @@ public String getFormatString() new GroupByStrategyV1( configSupplier, new GroupByQueryEngine(configSupplier, bufferPool), - NOOP_QUERYWATCHER, - bufferPool + NOOP_QUERYWATCHER ), new GroupByStrategyV2( druidProcessingConfig, diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java index 3f112cc155fd..d59794854aa1 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java @@ -69,11 +69,6 @@ public static void addIndexSpec(Class c, String name) JSON_MAPPER.registerSubtypes(new NamedType(c, name)); } - static { - // The off-heap incremental-index is not registered for production, but we want to include it in the tests. - IncrementalIndexCreator.addIndexSpec(OffheapIncrementalIndexTestSpec.class, OffheapIncrementalIndexTestSpec.TYPE); - } - /** * Fetch all the available incremental-index implementations. * It can be used to parametrize the test. If more parameters are needed, use indexTypeCartesianProduct(). diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java b/processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java deleted file mode 100644 index 925973199113..000000000000 --- a/processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment.incremental; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Supplier; -import org.apache.druid.collections.CloseableStupidPool; -import org.apache.druid.utils.JvmUtils; - -import javax.annotation.Nullable; -import java.io.Closeable; -import java.nio.ByteBuffer; - -/** - * OffheapIncrementalIndexTestSpec describes the off-heap indexing method for data ingestion. - * It also acts as a ByteBuffer supplier for the created off-heap incremental index. - * - * Note: since the off-heap incremental index is not yet supported in production ingestion, we define its spec here - * only for testing purposes. - */ -public class OffheapIncrementalIndexTestSpec implements AppendableIndexSpec, Supplier, Closeable -{ - public static final String TYPE = "offheap"; - static final int DEFAULT_BUFFER_SIZE = 1 << 23; - static final int DEFAULT_CACHE_SIZE = 1 << 30; - - final int bufferSize; - final int cacheSize; - - final CloseableStupidPool bufferPool; - - @JsonCreator - public OffheapIncrementalIndexTestSpec( - final @JsonProperty("bufferSize") @Nullable Integer bufferSize, - final @JsonProperty("cacheSize") @Nullable Integer cacheSize - ) - { - this.bufferSize = bufferSize != null && bufferSize > 0 ? bufferSize : DEFAULT_BUFFER_SIZE; - this.cacheSize = cacheSize != null && cacheSize > this.bufferSize ? cacheSize : DEFAULT_CACHE_SIZE; - this.bufferPool = new CloseableStupidPool<>( - "Off-heap incremental-index buffer pool", - this, - 0, - this.cacheSize / this.bufferSize - ); - } - - @JsonProperty - public int getBufferSize() - { - return bufferSize; - } - - @JsonProperty - public int getCacheSize() - { - return cacheSize; - } - - @Override - public AppendableIndexBuilder builder() - { - return new OffheapIncrementalIndex.Builder().setBufferPool(bufferPool); - } - - @Override - public long getDefaultMaxBytesInMemory() - { - // In the realtime node, the entire JVM's direct memory is utilized for ingestion and persist operations. - // But maxBytesInMemory only refers to the active index size and not to the index being flushed to disk and the - // persist buffer. - // To account for that, we set default to 1/2 of the max JVM's direct memory. - return JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes() / 2; - } - - // Supplier and Closeable interface implementation - - @Override - public ByteBuffer get() - { - return ByteBuffer.allocateDirect(bufferSize); - } - - @Override - public void close() - { - bufferPool.close(); - } -}