Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -53,10 +54,13 @@
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
Expand Down Expand Up @@ -238,6 +242,30 @@ private void setupQueries()

basicQueries.put("nested", queryA);
}

{ // basic.filter
final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(basicSchema.getDataInterval())
);
// Use multiple aggregators to see how the number of aggregators impact to the query performance
List<AggregatorFactory> queryAggs = ImmutableList.of(
new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"),
new LongSumAggregatorFactory("rows", "rows"),
new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"),
new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf")
);
GroupByQuery queryA = GroupByQuery
.builder()
.setDataSource("blah")
.setQuerySegmentSpec(intervalSpec)
.setDimensions(ImmutableList.of(new DefaultDimensionSpec("dimUniform", null)))
.setAggregatorSpecs(queryAggs)
.setGranularity(Granularity.fromString(queryGranularity))
.setDimFilter(new BoundDimFilter("dimUniform", "0", "100", true, true, null, null, null))
.build();

basicQueries.put("filter", queryA);
}
SCHEMA_QUERY_MAP.put("basic", basicQueries);

// simple one column schema, for testing performance difference between querying on numeric values as Strings and
Expand Down
4 changes: 4 additions & 0 deletions docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ threads. You can adjust this as necessary to balance concurrency and memory usag
historical nodes.
- groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the broker, whereas groupBy v2
ignores chunkPeriod.
- groupBy v2 supports both array-based aggregation and hash-based aggregation. The array-based aggregation is used only
when the grouping key is a single indexed string column. In array-based aggregation, the dictionary-encoded value is used
as the index, so the aggregated values in the array can be accessed directly without finding buckets based on hashing.

#### Memory tuning and resource limits

Expand Down Expand Up @@ -246,6 +249,7 @@ When using the "v2" strategy, the following query context parameters apply:
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|
|`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|
|`forceHashAggregation`|Force to use hash-based aggregation.|

When using the "v1" strategy, the following query context parameters apply:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.druid.data.input.MapBasedRow;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.groupby.epinephelinae.BufferGrouper;
import io.druid.query.groupby.epinephelinae.BufferHashGrouper;
import io.druid.query.groupby.epinephelinae.Grouper;
import io.druid.query.groupby.epinephelinae.GrouperTestUtil;
import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
Expand All @@ -36,15 +36,15 @@

import java.nio.ByteBuffer;

public class BufferGrouperUsingSketchMergeAggregatorFactoryTest
public class BufferHashGrouperUsingSketchMergeAggregatorFactoryTest
{
private static BufferGrouper<Integer> makeGrouper(
private static BufferHashGrouper<Integer> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize,
int initialBuckets
)
{
final BufferGrouper<Integer> grouper = new BufferGrouper<>(
final BufferHashGrouper<Integer> grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)),
GrouperTestUtil.intKeySerde(),
columnSelectorFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public class ConcatSequence<T> implements Sequence<T>
private final Sequence<Sequence<T>> baseSequences;

public ConcatSequence(
Sequence<Sequence<T>> baseSequences
Sequence<? extends Sequence<? extends T>> baseSequences
)
{
this.baseSequences = baseSequences;
this.baseSequences = (Sequence<Sequence<T>>) baseSequences;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.Ordering;

import java.io.Closeable;
import java.util.concurrent.Executor;
import java.util.function.Function;

Expand Down Expand Up @@ -68,6 +69,13 @@ default <U> Sequence<U> map(Function<? super T, ? extends U> mapper)
return new MappedSequence<>(this, mapper);
}

default <R> Sequence<R> flatMap(
Function<? super T, ? extends Sequence<? extends R>> mapper
)
{
return new ConcatSequence<>(this.map(mapper));
}

default <R> Sequence<R> flatMerge(
Function<? super T, ? extends Sequence<? extends R>> mapper,
Ordering<? super R> ordering
Expand All @@ -80,4 +88,9 @@ default Sequence<T> withEffect(Runnable effect, Executor effectExecutor)
{
return Sequences.withEffect(this, effect, effectExecutor);
}

default Sequence<T> withBaggage(Closeable baggage)
{
return Sequences.withBaggage(this, baggage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static <T> Sequence<T> concat(Iterable<Sequence<T>> sequences)
return concat(Sequences.simple(sequences));
}

public static <T> Sequence<T> concat(Sequence<Sequence<T>> sequences)
public static <T> Sequence<T> concat(Sequence<? extends Sequence<T>> sequences)
{
return new ConcatSequence<>(sequences);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize";
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";

@JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
Expand Down Expand Up @@ -70,6 +71,9 @@ public class GroupByQueryConfig
@JsonProperty
private boolean forcePushDownLimit = false;

@JsonProperty
private boolean forceHashAggregation = false;

public String getDefaultStrategy()
{
return defaultStrategy;
Expand Down Expand Up @@ -134,6 +138,11 @@ public boolean isForcePushDownLimit()
{
return forcePushDownLimit;
}

public boolean isForceHashAggregation()
{
return forceHashAggregation;
}

public GroupByQueryConfig withOverrides(final GroupByQuery query)
{
Expand Down Expand Up @@ -169,6 +178,7 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query)
getMaxMergingDictionarySize()
);
newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation());
return newConfig;
}

Expand All @@ -185,6 +195,8 @@ public String toString()
", bufferGrouperInitialBuckets=" + bufferGrouperInitialBuckets +
", maxMergingDictionarySize=" + maxMergingDictionarySize +
", maxOnDiskStorage=" + maxOnDiskStorage +
", forcePushDownLimit=" + forcePushDownLimit +
", forceHashAggregation=" + forceHashAggregation +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,10 @@

import java.nio.ByteBuffer;

public abstract class AbstractBufferGrouper<KeyType> implements Grouper<KeyType>
public abstract class AbstractBufferHashGrouper<KeyType> implements Grouper<KeyType>
{
private static final AggregateResult DICTIONARY_FULL = AggregateResult.failure(
"Not enough dictionary space to execute this query. Try increasing "
+ "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
private static final AggregateResult HASHTABLE_FULL = AggregateResult.failure(
"Not enough aggregation table space to execute this query. Try increasing "
+ "druid.processing.buffer.sizeBytes or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);

protected static final int HASH_SIZE = Ints.BYTES;
protected static final Logger log = new Logger(AbstractBufferGrouper.class);
protected static final Logger log = new Logger(AbstractBufferHashGrouper.class);

protected final Supplier<ByteBuffer> bufferSupplier;
protected final KeySerde<KeyType> keySerde;
Expand All @@ -61,7 +50,8 @@ public abstract class AbstractBufferGrouper<KeyType> implements Grouper<KeyType>
protected ByteBufferHashTable hashTable;
protected ByteBuffer hashTableBuffer; // buffer for the entire hash table (total space, not individual growth)

public AbstractBufferGrouper(
public AbstractBufferHashGrouper(
// the buffer returned from the below supplier can have dirty bits and should be cleared during initialization
final Supplier<ByteBuffer> bufferSupplier,
final KeySerde<KeyType> keySerde,
final AggregatorFactory[] aggregatorFactories,
Expand All @@ -77,7 +67,7 @@ public AbstractBufferGrouper(
}

/**
* Called when a new bucket is used for an entry in the hash table. An implementing BufferGrouper class
* Called when a new bucket is used for an entry in the hash table. An implementing BufferHashGrouper class
* can use this to update its own state, e.g. tracking bucket offsets in a structure outside of the hash table.
*
* @param bucketOffset offset of the new bucket, within the buffer returned by hashTable.getTableBuffer()
Expand All @@ -95,7 +85,7 @@ public AbstractBufferGrouper(
public abstract boolean canSkipAggregate(boolean bucketWasUsed, int bucketOffset);

/**
* Called after a row is aggregated. An implementing BufferGrouper class can use this to update
* Called after a row is aggregated. An implementing BufferHashGrouper class can use this to update
* its own state, e.g. reading the new aggregated values for the row's key and acting on that information.
*
* @param bucketOffset Offset of the bucket containing the row that was aggregated,
Expand Down Expand Up @@ -134,7 +124,7 @@ public AggregateResult aggregate(KeyType key, int keyHash)
if (keyBuffer == null) {
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct.
return DICTIONARY_FULL;
return Groupers.DICTIONARY_FULL;
}

if (keyBuffer.remaining() != keySize) {
Expand All @@ -150,7 +140,7 @@ public AggregateResult aggregate(KeyType key, int keyHash)
if (bucket < 0) {
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct.
return HASHTABLE_FULL;
return Groupers.HASH_TABLE_FULL;
}

final int bucketStartOffset = hashTable.getOffsetForBucket(bucket);
Expand Down Expand Up @@ -181,12 +171,6 @@ public AggregateResult aggregate(KeyType key, int keyHash)
return AggregateResult.ok();
}

@Override
public AggregateResult aggregate(final KeyType key)
{
return aggregate(key, Groupers.hash(key));
}

@Override
public void close()
{
Expand Down
Loading