Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,7 @@ public String getFormatString()
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
bufferPool
QueryBenchmarkUtil.NOOP_QUERYWATCHER
),
new GroupByStrategyV2(
druidProcessingConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,7 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
bufferPool
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new GroupByStrategyV2(
processingConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,7 @@ public String getFormatString()
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
bufferPool
QueryBenchmarkUtil.NOOP_QUERYWATCHER
),
new GroupByStrategyV2(
druidProcessingConfig,
Expand Down
6 changes: 3 additions & 3 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2090,9 +2090,9 @@ Supported query contexts:

|Key|Description|Default|
|---|-----------|-------|
|`maxIntermediateRows`|Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for this query.|None|
|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None|
|`useOffheap`|Set to true to store aggregations off-heap when merging results.|false|
|`maxIntermediateRows`|Ignored by groupBy v2. Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for a groupBy v1 query.|None|
|`maxResults`|Ignored by groupBy v2. Can be used to lower the value of `druid.query.groupBy.maxResults` for a groupBy v1 query.|None|
|`useOffheap`|Ignored by groupBy v2, and no longer supported for groupBy v1. Enabling this option with groupBy v1 will result in an error. For off-heap aggregation, switch to groupBy v2, which always operates off-heap.|false|

### Router

Expand Down
6 changes: 3 additions & 3 deletions docs/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,9 @@ Supported query contexts:

|Key|Description|Default|
|---|-----------|-------|
|`maxIntermediateRows`|Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for this query.|None|
|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None|
|`useOffheap`|Set to true to store aggregations off-heap when merging results.|false|
|`maxIntermediateRows`|Ignored by groupBy v2. Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for a groupBy v1 query.|None|
|`maxResults`|Ignored by groupBy v2. Can be used to lower the value of `druid.query.groupBy.maxResults` for a groupBy v1 query.|None|
|`useOffheap`|Ignored by groupBy v2, and no longer supported for groupBy v1. Enabling this option with groupBy v1 will result in an error. For off-heap aggregation, switch to groupBy v2, which always operates off-heap.|false|

#### Array based result rows

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.query.aggregation.datasketches.tuple;

import com.google.common.util.concurrent.Striped;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketches;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketch;
Expand All @@ -31,12 +30,9 @@
import org.apache.druid.segment.data.IndexedInts;

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;

/**
* This aggregator builds sketches from raw data.
Expand All @@ -45,16 +41,12 @@
*/
public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregator
{

private static final int NUM_STRIPES = 64; // for locking per buffer position (power of 2 to make index computation faster)

private final DimensionSelector keySelector;
private final BaseDoubleColumnValueSelector[] valueSelectors;
private final int nominalEntries;
private final int maxIntermediateSize;
@Nullable
private double[] values; // not part of the state, but to reuse in aggregate() method
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);

public ArrayOfDoublesSketchBuildBufferAggregator(
final DimensionSelector keySelector,
Expand All @@ -76,15 +68,10 @@ public void init(final ByteBuffer buf, final int position)
final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
.setNumberOfValues(valueSelectors.length)
.setNumberOfValues(valueSelectors.length).build(region);
.setNumberOfValues(valueSelectors.length)
.setNumberOfValues(valueSelectors.length).build(region);
}

/**
* This method uses locks because it can be used during indexing,
* and Druid can call aggregate() and get() concurrently
* https://github.com/apache/druid/pull/3956
*/
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
Expand All @@ -101,24 +88,14 @@ public void aggregate(final ByteBuffer buf, final int position)
// might might be considered, but it would increase complexity including relocate() support.
final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock();
lock.lock();
try {
final ArrayOfDoublesUpdatableSketch sketch = ArrayOfDoublesSketches.wrapUpdatableSketch(region);
for (int i = 0, keysSize = keys.size(); i < keysSize; i++) {
final String key = keySelector.lookupName(keys.get(i));
sketch.update(key, values);
}
}
finally {
lock.unlock();
final ArrayOfDoublesUpdatableSketch sketch = ArrayOfDoublesSketches.wrapUpdatableSketch(region);
for (int i = 0, keysSize = keys.size(); i < keysSize; i++) {
final String key = keySelector.lookupName(keys.get(i));
sketch.update(key, values);
}
}

/**
* This method uses locks because it can be used during indexing,
* and Druid can call aggregate() and get() concurrently
* https://github.com/apache/druid/pull/3956
* The returned sketch is a separate instance of ArrayOfDoublesCompactSketch
* representing the current state of the aggregation, and is not affected by consequent
* aggregate() calls
Expand All @@ -128,16 +105,9 @@ public Object get(final ByteBuffer buf, final int position)
{
final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
lock.lock();
try {
final ArrayOfDoublesUpdatableSketch sketch = (ArrayOfDoublesUpdatableSketch) ArrayOfDoublesSketches
.wrapSketch(region);
return sketch.compact();
}
finally {
lock.unlock();
}
final ArrayOfDoublesUpdatableSketch sketch = (ArrayOfDoublesUpdatableSketch) ArrayOfDoublesSketches
.wrapSketch(region);
return sketch.compact();
}

@Override
Expand All @@ -164,18 +134,4 @@ public void inspectRuntimeShape(final RuntimeShapeInspector inspector)
inspector.visit("keySelector", keySelector);
inspector.visit("valueSelectors", valueSelectors);
}

// compute lock index to avoid boxing in Striped.get() call
static int lockIndex(final int position)
{
return smear(position) % NUM_STRIPES;
}

// from https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548
private static int smear(int hashCode)
{
hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.query.aggregation.datasketches.tuple;

import com.google.common.util.concurrent.Striped;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSetOperationBuilder;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
Expand All @@ -31,8 +30,6 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;

/**
* This aggregator merges existing sketches.
Expand All @@ -41,14 +38,10 @@
*/
public class ArrayOfDoublesSketchMergeBufferAggregator implements BufferAggregator
{

private static final int NUM_STRIPES = 64; // for locking per buffer position

private final BaseObjectColumnValueSelector<ArrayOfDoublesSketch> selector;
private final int nominalEntries;
private final int numberOfValues;
private final int maxIntermediateSize;
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);

public ArrayOfDoublesSketchMergeBufferAggregator(
final BaseObjectColumnValueSelector<ArrayOfDoublesSketch> selector,
Expand All @@ -69,14 +62,9 @@ public void init(final ByteBuffer buf, final int position)
final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
new ArrayOfDoublesSetOperationBuilder().setNominalEntries(nominalEntries)
.setNumberOfValues(numberOfValues).buildUnion(region);
.setNumberOfValues(numberOfValues).buildUnion(region);
}

/**
* This method uses locks because it can be used during indexing,
* and Druid can call aggregate() and get() concurrently
* https://github.com/apache/druid/pull/3956
*/
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
Expand All @@ -86,18 +74,11 @@ public void aggregate(final ByteBuffer buf, final int position)
}
// Wrapping memory and ArrayOfDoublesUnion is inexpensive compared to union operations.
// Maintaining a cache of wrapped objects per buffer position like in Theta sketch aggregator
// might might be considered, but it would increase complexity including relocate() support.
// might be considered, but it would increase complexity including relocate() support.
final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).writeLock();
lock.lock();
try {
final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region);
union.union(update);
}
finally {
lock.unlock();
}
final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region);
union.union(update);
}

/**
Expand All @@ -113,15 +94,8 @@ public Object get(final ByteBuffer buf, final int position)
{
final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).readLock();
lock.lock();
try {
final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region);
return union.getResult();
}
finally {
lock.unlock();
}
final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region);
return union.getResult();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.ISE;
Expand All @@ -44,7 +43,6 @@
import org.apache.druid.query.groupby.GroupByQueryHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CancellationException;
Expand All @@ -59,22 +57,19 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
private final Iterable<QueryRunner<T>> queryables;
private final Supplier<GroupByQueryConfig> configSupplier;
private final QueryWatcher queryWatcher;
private final NonBlockingPool<ByteBuffer> bufferPool;
private final QueryProcessingPool queryProcessingPool;

public GroupByMergedQueryRunner(
QueryProcessingPool queryProcessingPool,
Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
NonBlockingPool<ByteBuffer> bufferPool,
Iterable<QueryRunner<T>> queryables
)
{
this.queryProcessingPool = queryProcessingPool;
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
this.bufferPool = bufferPool;
}

@Override
Expand All @@ -86,8 +81,7 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext respo
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
null,
querySpecificConfig,
bufferPool
querySpecificConfig
);
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = QueryContexts.isBySegment(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
* Thus, an Aggregator can be thought of as a closure over some other thing that is stateful and changes between calls
* to aggregate(). This is currently (as of this documentation) implemented through the use of {@link
* org.apache.druid.segment.ColumnValueSelector} objects.
*
* During ingestion, {@link org.apache.druid.segment.incremental.OnheapIncrementalIndex} uses instances of this class
* from multiple threads. In particular, ingestion threads call {@link #aggregate()} simultaneously with query threads
* calling the various "get" methods. Instances of this class must be thread-safe.
*/
@ExtensionPoint
public interface Aggregator extends Closeable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
* Thus, an Aggregator can be thought of as a closure over some other thing that is stateful and changes between calls
* to aggregate(...).
*
* Unlike {@link Aggregator}, BufferAggregators are never used by multiple threads at once. Implementations are not
* required to be thread safe.
*
* @see VectorAggregator, the vectorized version
*/
@ExtensionPoint
Expand Down Expand Up @@ -108,6 +111,7 @@ public interface BufferAggregator extends HotLoopCallee
* @param position offset within the byte buffer at which the aggregate value is stored
* @return the float representation of the aggregate
*/
@SuppressWarnings("unused") // Unused today, but may be used in the future by https://github.com/apache/druid/pull/10001
float getFloat(ByteBuffer buf, int position);

/**
Expand All @@ -125,6 +129,7 @@ public interface BufferAggregator extends HotLoopCallee
* @param position offset within the byte buffer at which the aggregate value is stored
* @return the long representation of the aggregate
*/
@SuppressWarnings("unused") // Unused today, but may be used in the future by https://github.com/apache/druid/pull/10001
long getLong(ByteBuffer buf, int position);

/**
Expand All @@ -146,6 +151,7 @@ public interface BufferAggregator extends HotLoopCallee
* @param position offset within the byte buffer at which the aggregate value is stored
* @return the double representation of the aggregate
*/
@SuppressWarnings("unused") // Unused today, but may be used in the future by https://github.com/apache/druid/pull/10001
default double getDouble(ByteBuffer buf, int position)
{
return (double) getFloat(buf, position);
Expand Down Expand Up @@ -204,6 +210,7 @@ default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, By
*
* @return true if the aggregated value is primitive long/double/float and aggregated value is null otherwise false.
*/
@SuppressWarnings("unused") // Unused today, but may be used in the future by https://github.com/apache/druid/pull/10001
default boolean isNull(ByteBuffer buf, int position)
{
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
* {@link org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop} because vectorized query engines do not use
* monomorphic-processing-style specialization.
*
* Unlike {@link Aggregator}, VectorAggregators are never used by multiple threads at once. Implementations are not
* required to be thread safe.
*
* @see BufferAggregator, the vectorized version.
*/
public interface VectorAggregator
Expand Down
Loading