Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
import org.apache.druid.segment.Capabilities;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void run(
@Nullable TopNQueryMetrics queryMetrics
)
{
if (params.getCardinality() != TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN) {
if (params.getCardinality() != TopNParams.CARDINALITY_UNKNOWN) {
runWithCardinalityKnown(params, resultBuilder, dimValSelector, queryMetrics);
} else {
runWithCardinalityUnknown(params, resultBuilder, queryMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@

import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.StorageAdapter;

import java.util.Map;

/**
* This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle multiple index numerals referencing the same dimension value.
* Heap based topn algorithm that handles aggregates on dimension extractions and numeric typed dimension columns.
*
* This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle
* multiple index numerals referencing the same dimension value.
*/
public class DimExtractionTopNAlgorithm
extends BaseTopNAlgorithm<Aggregator[][], Map<Comparable, Aggregator[]>, TopNParams>
public class HeapBasedTopNAlgorithm
extends BaseTopNAlgorithm<Aggregator[][], TopNColumnAggregatesProcessor, TopNParams>
{
private final TopNQuery query;

public DimExtractionTopNAlgorithm(
public HeapBasedTopNAlgorithm(
StorageAdapter storageAdapter,
TopNQuery query
)
Expand All @@ -47,7 +48,7 @@ public DimExtractionTopNAlgorithm(

@Override
public TopNParams makeInitParams(
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus,
final Cursor cursor
)
{
Expand All @@ -64,8 +65,8 @@ protected Aggregator[][] makeDimValSelector(TopNParams params, int numProcessed,
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
}
ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, storageAdapter);
ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy().getRowSelector(query, params, storageAdapter);
}

@Override
Expand All @@ -75,54 +76,46 @@ protected Aggregator[][] updateDimValSelector(Aggregator[][] aggregators, int nu
}

@Override
protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
protected TopNColumnAggregatesProcessor makeDimValAggregateStore(TopNParams params)
{
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy().makeDimExtractionAggregateStore();
final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy();
}

@Override
public long scanAndAggregate(
protected long scanAndAggregate(
TopNParams params,
Aggregator[][] rowSelector,
Map<Comparable, Aggregator[]> aggregatesStore
TopNColumnAggregatesProcessor processor
)
{
final Cursor cursor = params.getCursor();
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();

return selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate(
processor.initAggregateStore();
return processor.scanAndAggregate(
query,
selectorPlus.getSelector(),
cursor,
rowSelector,
aggregatesStore
rowSelector
);
}

@Override
protected void updateResults(
TopNParams params,
Aggregator[][] rowSelector,
Map<Comparable, Aggregator[]> aggregatesStore,
Aggregator[][] aggregators,
TopNColumnAggregatesProcessor processor,
TopNResultBuilder resultBuilder
)
{
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults(
aggregatesStore,
resultBuilder
);
processor.updateResults(resultBuilder);
}

@Override
protected void closeAggregators(Map<Comparable, Aggregator[]> valueMap)
protected void closeAggregators(TopNColumnAggregatesProcessor processor)
{
for (Aggregator[] aggregators : valueMap.values()) {
for (Aggregator agg : aggregators) {
agg.close();
}
}
processor.closeAggregators();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ValueType;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable, Aggregator[]>, TopNParams>
public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable<?>, Aggregator[]>, TopNParams>
{
private static final int[] EMPTY_INTS = new int[]{};

Expand Down Expand Up @@ -74,17 +75,16 @@ protected int[] updateDimValSelector(int[] dimValSelector, int numProcessed, int
}

@Override
@SuppressWarnings("unchecked")
protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
protected Map<Comparable<?>, Aggregator[]> makeDimValAggregateStore(TopNParams params)
{
return params.getSelectorPlus().getColumnSelectorStrategy().makeDimExtractionAggregateStore();
return new HashMap<>();
}

@Override
protected long scanAndAggregate(
TopNParams params,
int[] dimValSelector,
Map<Comparable, Aggregator[]> aggregatesStore
Map<Comparable<?>, Aggregator[]> aggregatesStore
)
{
if (params.getCardinality() < 0) {
Expand All @@ -96,7 +96,7 @@ protected long scanAndAggregate(

long processedRows = 0;
while (!cursor.isDone()) {
final Comparable key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
final Comparable<?> key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));

Aggregator[] theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
Expand All @@ -118,11 +118,11 @@ protected long scanAndAggregate(
protected void updateResults(
TopNParams params,
int[] dimValSelector,
Map<Comparable, Aggregator[]> aggregatesStore,
Map<Comparable<?>, Aggregator[]> aggregatesStore,
TopNResultBuilder resultBuilder
)
{
for (Map.Entry<Comparable, Aggregator[]> entry : aggregatesStore.entrySet()) {
for (Map.Entry<Comparable<?>, Aggregator[]> entry : aggregatesStore.entrySet()) {
Aggregator[] aggs = entry.getValue();
if (aggs != null) {
Object[] vals = new Object[aggs.length];
Expand All @@ -140,7 +140,7 @@ protected void updateResults(
}

@Override
protected void closeAggregators(Map<Comparable, Aggregator[]> stringMap)
protected void closeAggregators(Map<Comparable<?>, Aggregator[]> stringMap)
{
for (Aggregator[] aggregators : stringMap.values()) {
for (Aggregator agg : aggregators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.segment.Cursor;

import javax.annotation.Nullable;
Expand All @@ -34,7 +34,7 @@ public interface TopNAlgorithm<DimValSelector, Parameters extends TopNParams>
int INIT_POSITION_VALUE = -1;
int SKIP_POSITION_VALUE = -2;

TopNParams makeInitParams(ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus, Cursor cursor);
TopNParams makeInitParams(ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus, Cursor cursor);

void run(
Parameters params,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@

import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.Result;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategyFactory;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionHandlerUtils;

Expand All @@ -42,13 +43,15 @@ public TopNMapFn(
}

@SuppressWarnings("unchecked")
@Nullable
public Result<TopNResultValue> apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics)
{
final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus(
new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()),
query.getDimensionSpec(),
cursor.getColumnSelectorFactory()
);
final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
DimensionHandlerUtils.createColumnSelectorPlus(
new TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()),
query.getDimensionSpec(),
cursor.getColumnSelectorFactory()
);

if (selectorPlus.getSelector() == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@
package org.apache.druid.query.topn;

import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;

/**
*/
public class TopNParams
{
public static final int CARDINALITY_UNKNOWN = -1;
private final Cursor cursor;
private final int cardinality;
private final int numValuesPerPass;
private final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus;
private final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus;

protected TopNParams(
ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus,
Cursor cursor,
int numValuesPerPass
)
Expand All @@ -52,7 +53,7 @@ public DimensionSelector getDimSelector()
return (DimensionSelector) selectorPlus.getSelector();
}

public ColumnSelectorPlus<TopNColumnSelectorStrategy> getSelectorPlus()
public ColumnSelectorPlus<TopNColumnAggregatesProcessor> getSelectorPlus()
{
return selectorPlus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.query.topn;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import org.apache.druid.collections.NonBlockingPool;
Expand All @@ -30,7 +29,6 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.SegmentMissingException;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
Expand Down Expand Up @@ -86,16 +84,11 @@ public Sequence<Result<TopNResultValue>> query(
query.isDescending(),
queryMetrics
),
new Function<Cursor, Result<TopNResultValue>>()
{
@Override
public Result<TopNResultValue> apply(Cursor input)
{
if (queryMetrics != null) {
queryMetrics.cursor(input);
}
return mapFn.apply(input, queryMetrics);
input -> {
if (queryMetrics != null) {
queryMetrics.cursor(input);
}
return mapFn.apply(input, queryMetrics);
}
),
Predicates.notNull()
Expand Down Expand Up @@ -125,7 +118,8 @@ private TopNMapFn getMapFn(
final ColumnCapabilities columnCapabilities = query.getVirtualColumns()
.getColumnCapabilitiesWithFallback(adapter, dimension);

final TopNAlgorithm topNAlgorithm;

final TopNAlgorithm<?, ?> topNAlgorithm;
if (
selector.isHasExtractionFn() &&
// TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
Expand All @@ -137,20 +131,23 @@ private TopNMapFn getMapFn(
// currently relies on the dimension cardinality to support lexicographic sorting
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else if (selector.isHasExtractionFn()) {
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING
&& columnCapabilities.isDictionaryEncoded())) {
// Use DimExtraction for non-Strings and for non-dictionary-encoded Strings.
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
// Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings.
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
// Use DimExtraction when the dimension output type is a non-String. (It's like an extractionFn: there can be
// Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
// a many-to-one mapping, since numeric types can't represent all possible values of other types.)
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (selector.isAggregateAllMetrics()) {
// sorted by dimension
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
} else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
// high cardinality dimensions with larger result sets
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
} else {
// anything else
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
}
if (queryMetrics != null) {
Expand Down
Loading