From ae47598b211d8dccbe6eb919177347a9e0e90df2 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 13 Jan 2020 19:12:26 -0800 Subject: [PATCH 1/7] fix topn issue with aggregating on numeric columns with null values --- .../druid/query/topn/BaseTopNAlgorithm.java | 3 +- .../topn/DimExtractionTopNAlgorithm.java | 47 ++- .../topn/TimeExtractionTopNAlgorithm.java | 5 +- .../druid/query/topn/TopNAlgorithm.java | 4 +- .../apache/druid/query/topn/TopNMapFn.java | 4 +- .../apache/druid/query/topn/TopNParams.java | 9 +- .../DoubleTopNColumnAggregatesProcessor.java | 77 +++++ .../FloatTopNColumnAggregatesProcessor.java | 77 +++++ ...apBasedTopNColumnAggregatesProcessor.java} | 43 +-- ...dTopNColumnAggregatesProcessorFactory.java | 82 ++++++ .../LongTopNColumnAggregatesProcessor.java | 73 +++++ ...eNumericTopNColumnAggregatesProcessor.java | 137 +++++++++ .../NumericTopNColumnSelectorStrategy.java | 277 ------------------ ... StringTopNColumnAggregatesProcessor.java} | 78 ++--- .../TopNColumnSelectorStrategyFactory.java | 71 ----- .../druid/sql/calcite/CalciteQueryTest.java | 120 ++++++++ 16 files changed, 647 insertions(+), 460 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java rename processing/src/main/java/org/apache/druid/query/topn/types/{TopNColumnSelectorStrategy.java => HeapBasedTopNColumnAggregatesProcessor.java} (71%) create mode 100644 processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessorFactory.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java delete mode 100644 processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java rename processing/src/main/java/org/apache/druid/query/topn/types/{StringTopNColumnSelectorStrategy.java => StringTopNColumnAggregatesProcessor.java} (78%) delete mode 100644 processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java index bb49c357f3fe..3a1fdbdadfd1 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java @@ -25,7 +25,6 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy; import org.apache.druid.segment.Capabilities; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; @@ -79,7 +78,7 @@ public void run( @Nullable TopNQueryMetrics queryMetrics ) { - if (params.getCardinality() != TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN) { + if (params.getCardinality() != TopNParams.CARDINALITY_UNKNOWN) { runWithCardinalityKnown(params, resultBuilder, dimValSelector, queryMetrics); } else { runWithCardinalityUnknown(params, resultBuilder, queryMetrics); diff --git a/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java index 8f3941bc4a07..823397bbc813 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java @@ -21,17 +21,15 @@ import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy; +import org.apache.druid.query.topn.types.HeapBasedTopNColumnAggregatesProcessor; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.StorageAdapter; -import java.util.Map; - /** * This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle multiple index numerals referencing the same dimension value. */ public class DimExtractionTopNAlgorithm - extends BaseTopNAlgorithm, TopNParams> + extends BaseTopNAlgorithm { private final TopNQuery query; @@ -47,7 +45,7 @@ public DimExtractionTopNAlgorithm( @Override public TopNParams makeInitParams( - final ColumnSelectorPlus selectorPlus, + final ColumnSelectorPlus selectorPlus, final Cursor cursor ) { @@ -64,8 +62,8 @@ protected Aggregator[][] makeDimValSelector(TopNParams params, int numProcessed, if (params.getCardinality() < 0) { throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality"); } - ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); - return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, storageAdapter); + ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + return selectorPlus.getColumnSelectorStrategy().getRowSelector(query, params, storageAdapter); } @Override @@ -75,54 +73,45 @@ protected Aggregator[][] updateDimValSelector(Aggregator[][] aggregators, int nu } @Override - protected Map makeDimValAggregateStore(TopNParams params) + protected HeapBasedTopNColumnAggregatesProcessor makeDimValAggregateStore(TopNParams params) { - final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); - return selectorPlus.getColumnSelectorStrategy().makeDimExtractionAggregateStore(); + final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + return selectorPlus.getColumnSelectorStrategy(); } @Override - public long scanAndAggregate( + protected long scanAndAggregate( TopNParams params, Aggregator[][] rowSelector, - Map aggregatesStore + HeapBasedTopNColumnAggregatesProcessor processor ) { final Cursor cursor = params.getCursor(); - final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); - return selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate( + return processor.scanAndAggregate( query, selectorPlus.getSelector(), cursor, - rowSelector, - aggregatesStore + rowSelector ); } @Override protected void updateResults( TopNParams params, - Aggregator[][] rowSelector, - Map aggregatesStore, + Aggregator[][] aggregators, + HeapBasedTopNColumnAggregatesProcessor processor, TopNResultBuilder resultBuilder ) { - final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); - selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults( - aggregatesStore, - resultBuilder - ); + processor.updateResults(resultBuilder); } @Override - protected void closeAggregators(Map valueMap) + protected void closeAggregators(HeapBasedTopNColumnAggregatesProcessor processor) { - for (Aggregator[] aggregators : valueMap.values()) { - for (Aggregator agg : aggregators) { - agg.close(); - } - } + processor.closeAggregators(); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index 7d4c23cee23b..5a244608b892 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -27,6 +27,7 @@ import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ValueType; +import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -77,7 +78,7 @@ protected int[] updateDimValSelector(int[] dimValSelector, int numProcessed, int @SuppressWarnings("unchecked") protected Map makeDimValAggregateStore(TopNParams params) { - return params.getSelectorPlus().getColumnSelectorStrategy().makeDimExtractionAggregateStore(); + return new HashMap<>(); } @Override @@ -96,7 +97,7 @@ protected long scanAndAggregate( long processedRows = 0; while (!cursor.isDone()) { - final Comparable key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0))); + final Comparable key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0))); Aggregator[] theAggregators = aggregatesStore.get(key); if (theAggregators == null) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java index 0f7a52707a01..bb2d2e0d4450 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java @@ -21,7 +21,7 @@ import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy; +import org.apache.druid.query.topn.types.HeapBasedTopNColumnAggregatesProcessor; import org.apache.druid.segment.Cursor; import javax.annotation.Nullable; @@ -34,7 +34,7 @@ public interface TopNAlgorithm int INIT_POSITION_VALUE = -1; int SKIP_POSITION_VALUE = -2; - TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor); + TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor); void run( Parameters params, diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java index af12047005a5..98dd6be6176a 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java @@ -21,7 +21,7 @@ import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.Result; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategyFactory; +import org.apache.druid.query.topn.types.HeapBasedTopNColumnAggregatesProcessorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionHandlerUtils; @@ -45,7 +45,7 @@ public TopNMapFn( public Result apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics) { final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus( - new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()), + new HeapBasedTopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()), query.getDimensionSpec(), cursor.getColumnSelectorFactory() ); diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java index a80e1bd1f732..389c6a1cf244 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java @@ -20,7 +20,7 @@ package org.apache.druid.query.topn; import org.apache.druid.query.ColumnSelectorPlus; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy; +import org.apache.druid.query.topn.types.HeapBasedTopNColumnAggregatesProcessor; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; @@ -28,13 +28,14 @@ */ public class TopNParams { + public static final int CARDINALITY_UNKNOWN = -1; private final Cursor cursor; private final int cardinality; private final int numValuesPerPass; - private final ColumnSelectorPlus selectorPlus; + private final ColumnSelectorPlus selectorPlus; protected TopNParams( - ColumnSelectorPlus selectorPlus, + ColumnSelectorPlus selectorPlus, Cursor cursor, int numValuesPerPass ) @@ -52,7 +53,7 @@ public DimensionSelector getDimSelector() return (DimensionSelector) selectorPlus.getSelector(); } - public ColumnSelectorPlus getSelectorPlus() + public ColumnSelectorPlus getSelectorPlus() { return selectorPlus; } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java new file mode 100644 index 000000000000..a47b9a82f51a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.types; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.topn.BaseTopNAlgorithm; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.Cursor; + +import java.util.Map; +import java.util.function.Function; + +public class DoubleTopNColumnAggregatesProcessor + extends NullableNumericTopNColumnAggregatesProcessor +{ + private Long2ObjectMap aggregatesStore; + + protected DoubleTopNColumnAggregatesProcessor(Function> converter) + { + super(converter); + } + + @Override + Aggregator[] getValueAggregators( + TopNQuery query, + BaseDoubleColumnValueSelector selector, + Cursor cursor + ) + { + long key = Double.doubleToLongBits(selector.getDouble()); + Aggregator[] aggs = aggregatesStore.get(key); + if (aggs == null) { + aggs = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + aggregatesStore.put(key, aggs); + } + return aggs; + } + + @Override + public void initAggregateStore() + { + nullValueAggregates = null; + aggregatesStore = new Long2ObjectOpenHashMap<>(); + } + + @Override + Map getAggregatesStore() + { + return aggregatesStore; + } + + @Override + Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) + { + return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java new file mode 100644 index 000000000000..42c36a86b8f5 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.types; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.topn.BaseTopNAlgorithm; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.segment.Cursor; + +import java.util.Map; +import java.util.function.Function; + +public class FloatTopNColumnAggregatesProcessor + extends NullableNumericTopNColumnAggregatesProcessor +{ + private Int2ObjectMap aggregatesStore; + + protected FloatTopNColumnAggregatesProcessor(Function> converter) + { + super(converter); + } + + @Override + Aggregator[] getValueAggregators( + TopNQuery query, + BaseFloatColumnValueSelector selector, + Cursor cursor + ) + { + int key = Float.floatToIntBits(selector.getFloat()); + Aggregator[] aggs = aggregatesStore.get(key); + if (aggs == null) { + aggs = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + aggregatesStore.put(key, aggs); + } + return aggs; + } + + @Override + public void initAggregateStore() + { + nullValueAggregates = null; + this.aggregatesStore = new Int2ObjectOpenHashMap<>(); + } + + @Override + Map getAggregatesStore() + { + return aggregatesStore; + } + + @Override + Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) + { + return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessor.java similarity index 71% rename from processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java rename to processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessor.java index 464189d487d7..4cb5a9e1fa47 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessor.java @@ -27,13 +27,8 @@ import org.apache.druid.segment.Cursor; import org.apache.druid.segment.StorageAdapter; -import java.util.Map; - -public interface TopNColumnSelectorStrategy - extends ColumnSelectorStrategy +public interface HeapBasedTopNColumnAggregatesProcessor extends ColumnSelectorStrategy { - int CARDINALITY_UNKNOWN = -1; - int getCardinality(ValueSelectorType selector); /** @@ -53,17 +48,7 @@ public interface TopNColumnSelectorStrategy +{ + private final ValueType dimensionType; + + public HeapBasedTopNColumnAggregatesProcessorFactory(final ValueType dimensionType) + { + this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType"); + } + + @Override + public HeapBasedTopNColumnAggregatesProcessor makeColumnSelectorStrategy( + ColumnCapabilities capabilities, + ColumnValueSelector selector + ) + { + final ValueType selectorType = capabilities.getType(); + + if (selectorType.equals(ValueType.STRING)) { + return new StringTopNColumnAggregatesProcessor(dimensionType); + } else if (selectorType.isNumeric()) { + final Function> converter; + final ValueType strategyType; + // When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using + // a numeric type and then converts to the desired output type after aggregating. We must be careful not to + // convert to an output type that cannot represent all possible values of the input type. + if (ValueType.isNumeric(dimensionType)) { + // Return strategy that aggregates using the _output_ type, because this allows us to collapse values + // properly (numeric types cannot always represent all values of other numeric types). + converter = DimensionHandlerUtils.converterFromTypeToType(dimensionType, dimensionType); + strategyType = dimensionType; + } else { + // Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can + // represent all possible values of the input type. This will be true for STRING, which is the only + // non-numeric type currently supported. + converter = DimensionHandlerUtils.converterFromTypeToType(selectorType, dimensionType); + strategyType = selectorType; + } + switch (strategyType) { + case LONG: + return new LongTopNColumnAggregatesProcessor(converter); + case FLOAT: + return new FloatTopNColumnAggregatesProcessor(converter); + case DOUBLE: + return new DoubleTopNColumnAggregatesProcessor(converter); + } + } + + throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java new file mode 100644 index 000000000000..41b220ae7597 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.types; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.topn.BaseTopNAlgorithm; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.Cursor; + +import java.util.Map; +import java.util.function.Function; + +public class LongTopNColumnAggregatesProcessor + extends NullableNumericTopNColumnAggregatesProcessor +{ + private Long2ObjectMap aggregatesStore; + + public LongTopNColumnAggregatesProcessor(Function> converter) + { + super(converter); + } + + @Override + Aggregator[] getValueAggregators(TopNQuery query, BaseLongColumnValueSelector selector, Cursor cursor) + { + long key = selector.getLong(); + Aggregator[] aggs = aggregatesStore.get(key); + if (aggs == null) { + aggs = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + aggregatesStore.put(key, aggs); + } + return aggs; + } + + @Override + public void initAggregateStore() + { + nullValueAggregates = null; + aggregatesStore = new Long2ObjectOpenHashMap<>(); + } + + @Override + Map getAggregatesStore() + { + return aggregatesStore; + } + + @Override + Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) + { + return converter.apply(aggregatorStoreKey); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java new file mode 100644 index 000000000000..72c573d70d3c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.types; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.topn.BaseTopNAlgorithm; +import org.apache.druid.query.topn.TopNParams; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.topn.TopNResultBuilder; +import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.StorageAdapter; + +import java.util.Map; +import java.util.function.Function; + +public abstract class NullableNumericTopNColumnAggregatesProcessor + implements HeapBasedTopNColumnAggregatesProcessor +{ + private final boolean hasNulls = !NullHandling.replaceWithDefault(); + final Function> converter; + Aggregator[] nullValueAggregates; + + protected NullableNumericTopNColumnAggregatesProcessor(Function> converter) + { + this.converter = converter; + } + + abstract Aggregator[] getValueAggregators(TopNQuery query, Selector selector, Cursor cursor); + + abstract Map getAggregatesStore(); + + abstract Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey); + + @Override + public int getCardinality(Selector selector) + { + return TopNParams.CARDINALITY_UNKNOWN; + } + + @Override + public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter) + { + return null; + } + + @Override + public long scanAndAggregate( + TopNQuery query, + Selector selector, + Cursor cursor, + Aggregator[][] rowSelector + ) + { + initAggregateStore(); + long processedRows = 0; + while (!cursor.isDone()) { + if (hasNulls && selector.isNull()) { + if (nullValueAggregates == null) { + nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + } + for (Aggregator aggregator : nullValueAggregates) { + aggregator.aggregate(); + } + } else { + Aggregator[] valueAggregates = getValueAggregators(query, selector, cursor); + for (Aggregator aggregator : valueAggregates) { + aggregator.aggregate(); + } + } + cursor.advance(); + processedRows++; + } + return processedRows; + } + + + @Override + public void updateResults(TopNResultBuilder resultBuilder) + { + for (Map.Entry entry : getAggregatesStore().entrySet()) { + Aggregator[] aggs = entry.getValue(); + if (aggs != null) { + Object[] vals = new Object[aggs.length]; + for (int i = 0; i < aggs.length; i++) { + vals[i] = aggs[i].get(); + } + + final Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey()); + resultBuilder.addEntry(key, key, vals); + } + } + + if (nullValueAggregates != null) { + Object[] nullVals = new Object[nullValueAggregates.length]; + for (int i = 0; i < nullValueAggregates.length; i++) { + nullVals[i] = nullValueAggregates[i].get(); + } + + resultBuilder.addEntry(null, null, nullVals); + } + } + + @Override + public void closeAggregators() + { + for (Aggregator[] aggregators : getAggregatesStore().values()) { + for (Aggregator agg : aggregators) { + agg.close(); + } + } + + if (nullValueAggregates != null) { + for (Aggregator nullAgg : nullValueAggregates) { + nullAgg.close(); + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java deleted file mode 100644 index e27baf4e2021..000000000000 --- a/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.topn.types; - -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.topn.BaseTopNAlgorithm; -import org.apache.druid.query.topn.TopNParams; -import org.apache.druid.query.topn.TopNQuery; -import org.apache.druid.query.topn.TopNResultBuilder; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; -import org.apache.druid.segment.BaseFloatColumnValueSelector; -import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.Cursor; -import org.apache.druid.segment.DimensionHandlerUtils; -import org.apache.druid.segment.StorageAdapter; -import org.apache.druid.segment.column.ValueType; - -import java.util.Map; -import java.util.function.Function; - -public abstract class NumericTopNColumnSelectorStrategy< - ValueSelectorType, - DimExtractionAggregateStoreType extends Map> - implements TopNColumnSelectorStrategy -{ - public static TopNColumnSelectorStrategy ofType(final ValueType selectorType, final ValueType dimensionType) - { - final Function> converter = DimensionHandlerUtils.converterFromTypeToType( - selectorType, - dimensionType - ); - - switch (selectorType) { - case LONG: - return new OfLong(converter); - case FLOAT: - return new OfFloat(converter); - case DOUBLE: - return new OfDouble(converter); - default: - throw new IAE("No strategy for type[%s]", selectorType); - } - } - - @Override - public int getCardinality(ValueSelectorType selector) - { - return TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN; - } - - @Override - public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter) - { - return null; - } - - static long floatDimExtractionScanAndAggregate( - TopNQuery query, - BaseFloatColumnValueSelector selector, - Cursor cursor, - Int2ObjectMap aggregatesStore - ) - { - long processedRows = 0; - while (!cursor.isDone()) { - int key = Float.floatToIntBits(selector.getFloat()); - Aggregator[] theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - for (Aggregator aggregator : theAggregators) { - aggregator.aggregate(); - } - cursor.advance(); - processedRows++; - } - return processedRows; - } - - static long doubleDimExtractionScanAndAggregate( - TopNQuery query, - BaseDoubleColumnValueSelector selector, - Cursor cursor, - Long2ObjectMap aggregatesStore - ) - { - long processedRows = 0; - while (!cursor.isDone()) { - long key = Double.doubleToLongBits(selector.getDouble()); - Aggregator[] theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - for (Aggregator aggregator : theAggregators) { - aggregator.aggregate(); - } - cursor.advance(); - processedRows++; - } - return processedRows; - } - - static long longDimExtractionScanAndAggregate( - TopNQuery query, - BaseLongColumnValueSelector selector, - Cursor cursor, - Long2ObjectMap aggregatesStore - ) - { - long processedRows = 0; - while (!cursor.isDone()) { - long key = selector.getLong(); - Aggregator[] theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - for (Aggregator aggregator : theAggregators) { - aggregator.aggregate(); - } - cursor.advance(); - processedRows++; - } - return processedRows; - } - - @Override - public void updateDimExtractionResults( - final DimExtractionAggregateStoreType aggregatesStore, - final TopNResultBuilder resultBuilder - ) - { - for (Map.Entry entry : aggregatesStore.entrySet()) { - Aggregator[] aggs = entry.getValue(); - if (aggs != null) { - Object[] vals = new Object[aggs.length]; - for (int i = 0; i < aggs.length; i++) { - vals[i] = aggs[i].get(); - } - - final Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey()); - resultBuilder.addEntry(key, key, vals); - } - } - } - - abstract Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey); - - static class OfFloat - extends NumericTopNColumnSelectorStrategy> - { - private final Function> converter; - - OfFloat(final Function> converter) - { - this.converter = converter; - } - - @Override - public Int2ObjectMap makeDimExtractionAggregateStore() - { - return new Int2ObjectOpenHashMap<>(); - } - - @Override - Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) - { - return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey)); - } - - @Override - public long dimExtractionScanAndAggregate( - TopNQuery query, - BaseFloatColumnValueSelector selector, - Cursor cursor, - Aggregator[][] rowSelector, - Int2ObjectMap aggregatesStore - ) - { - return floatDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore); - } - } - - static class OfLong - extends NumericTopNColumnSelectorStrategy> - { - private final Function> converter; - - OfLong(final Function> converter) - { - this.converter = converter; - } - - @Override - public Long2ObjectMap makeDimExtractionAggregateStore() - { - return new Long2ObjectOpenHashMap<>(); - } - - @Override - Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) - { - return converter.apply(aggregatorStoreKey); - } - - @Override - public long dimExtractionScanAndAggregate( - TopNQuery query, - BaseLongColumnValueSelector selector, - Cursor cursor, - Aggregator[][] rowSelector, - Long2ObjectMap aggregatesStore - ) - { - return longDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore); - } - } - - static class OfDouble - extends NumericTopNColumnSelectorStrategy> - { - private final Function> converter; - - OfDouble(final Function> converter) - { - this.converter = converter; - } - - @Override - public Long2ObjectMap makeDimExtractionAggregateStore() - { - return new Long2ObjectOpenHashMap<>(); - } - - @Override - Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey) - { - return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey)); - } - - @Override - public long dimExtractionScanAndAggregate( - TopNQuery query, - BaseDoubleColumnValueSelector selector, - Cursor cursor, - Aggregator[][] rowSelector, - Long2ObjectMap aggregatesStore - ) - { - return doubleDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore); - } - } -} diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java similarity index 78% rename from processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java rename to processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index f5e838d42b60..2d75e4c3f0d8 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -36,12 +36,12 @@ import java.util.Map; import java.util.function.Function; -public class StringTopNColumnSelectorStrategy - implements TopNColumnSelectorStrategy, Aggregator[]>> +public class StringTopNColumnAggregatesProcessor implements HeapBasedTopNColumnAggregatesProcessor { private final Function> dimensionValueConverter; + private HashMap, Aggregator[]> aggregatesStore; - public StringTopNColumnSelectorStrategy(final ValueType dimensionType) + public StringTopNColumnAggregatesProcessor(final ValueType dimensionType) { this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType); } @@ -53,7 +53,7 @@ public int getCardinality(DimensionSelector selector) } @Override - public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter) + public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter) { if (params.getCardinality() < 0) { throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality"); @@ -74,53 +74,59 @@ public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams pa } @Override - public Map, Aggregator[]> makeDimExtractionAggregateStore() + public void updateResults(TopNResultBuilder resultBuilder) { - return new HashMap<>(); + for (Map.Entry entry : aggregatesStore.entrySet()) { + Aggregator[] aggs = entry.getValue(); + if (aggs != null) { + Object[] vals = new Object[aggs.length]; + for (int i = 0; i < aggs.length; i++) { + vals[i] = aggs[i].get(); + } + + final Comparable key = dimensionValueConverter.apply(entry.getKey()); + resultBuilder.addEntry(key, key, vals); + } + } + } + + @Override + public void closeAggregators() + { + for (Aggregator[] aggregators : aggregatesStore.values()) { + for (Aggregator agg : aggregators) { + agg.close(); + } + } } @Override - public long dimExtractionScanAndAggregate( + public long scanAndAggregate( TopNQuery query, DimensionSelector selector, Cursor cursor, - Aggregator[][] rowSelector, - Map, Aggregator[]> aggregatesStore + Aggregator[][] rowSelector ) { + initAggregateStore(); if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) { - return dimExtractionScanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector, aggregatesStore); + return dimExtractionScanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector); } else { - return dimExtractionScanAndAggregateWithCardinalityUnknown(query, cursor, selector, aggregatesStore); + return dimExtractionScanAndAggregateWithCardinalityUnknown(query, cursor, selector); } } @Override - public void updateDimExtractionResults( - final Map, Aggregator[]> aggregatesStore, - final TopNResultBuilder resultBuilder - ) + public void initAggregateStore() { - for (Map.Entry, Aggregator[]> entry : aggregatesStore.entrySet()) { - Aggregator[] aggs = entry.getValue(); - if (aggs != null) { - Object[] vals = new Object[aggs.length]; - for (int i = 0; i < aggs.length; i++) { - vals[i] = aggs[i].get(); - } - - final Comparable key = dimensionValueConverter.apply(entry.getKey()); - resultBuilder.addEntry(key, key, vals); - } - } + this.aggregatesStore = new HashMap<>(); } private long dimExtractionScanAndAggregateWithCardinalityKnown( TopNQuery query, Cursor cursor, DimensionSelector selector, - Aggregator[][] rowSelector, - Map, Aggregator[]> aggregatesStore + Aggregator[][] rowSelector ) { long processedRows = 0; @@ -152,8 +158,7 @@ private long dimExtractionScanAndAggregateWithCardinalityKnown( private long dimExtractionScanAndAggregateWithCardinalityUnknown( TopNQuery query, Cursor cursor, - DimensionSelector selector, - Map, Aggregator[]> aggregatesStore + DimensionSelector selector ) { long processedRows = 0; @@ -162,13 +167,12 @@ private long dimExtractionScanAndAggregateWithCardinalityUnknown( for (int i = 0, size = dimValues.size(); i < size; ++i) { final int dimIndex = dimValues.get(i); final Comparable key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); - - Aggregator[] theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); + Aggregator[] aggs = aggregatesStore.get(key); + if (aggs == null) { + aggs = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + aggregatesStore.put(key, aggs); } - for (Aggregator aggregator : theAggregators) { + for (Aggregator aggregator : aggs) { aggregator.aggregate(); } } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java deleted file mode 100644 index 8cc820d04ba7..000000000000 --- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.topn.types; - -import com.google.common.base.Preconditions; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; -import org.apache.druid.segment.column.ValueType; - -public class TopNColumnSelectorStrategyFactory implements ColumnSelectorStrategyFactory -{ - private final ValueType dimensionType; - - public TopNColumnSelectorStrategyFactory(final ValueType dimensionType) - { - this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType"); - } - - @Override - public TopNColumnSelectorStrategy makeColumnSelectorStrategy( - ColumnCapabilities capabilities, - ColumnValueSelector selector - ) - { - final ValueType selectorType = capabilities.getType(); - - switch (selectorType) { - case STRING: - // Return strategy that reads strings and outputs dimensionTypes. - return new StringTopNColumnSelectorStrategy(dimensionType); - case LONG: - case FLOAT: - case DOUBLE: - // When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using - // a numeric type and then converts to the desired output type after aggregating. We must be careful not to - // convert to an output type that cannot represent all possible values of the input type. - - if (ValueType.isNumeric(dimensionType)) { - // Return strategy that aggregates using the _output_ type, because this allows us to collapse values - // properly (numeric types cannot always represent all values of other numeric types). - return NumericTopNColumnSelectorStrategy.ofType(dimensionType, dimensionType); - } else { - // Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can - // represent all possible values of the input type. This will be true for STRING, which is the only - // non-numeric type currently supported. - return NumericTopNColumnSelectorStrategy.ofType(selectorType, dimensionType); - } - default: - throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType); - } - } -} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 895b452e47c7..b58010d3701e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -2344,6 +2344,126 @@ public void testNullFloatFilter() throws Exception ); } + @Test + public void testNullDoubleTopN() throws Exception + { + List expected; + if (useDefault) { + expected = ImmutableList.of( + new Object[]{1.7, 1L}, + new Object[]{1.0, 1L}, + new Object[]{0.0, 4L} + ); + } else { + expected = ImmutableList.of( + new Object[]{null, 3L}, + new Object[]{1.7, 1L}, + new Object[]{1.0, 1L}, + new Object[]{0.0, 1L} + ); + } + testQuery( + "SELECT d1, COUNT(*) FROM druid.numfoo GROUP BY d1 ORDER BY d1 DESC LIMIT 10", + QUERY_CONTEXT_DEFAULT, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("d1", "_d0", ValueType.DOUBLE)) + .threshold(10) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .metric( + new InvertedTopNMetricSpec( + new DimensionTopNMetricSpec(null, StringComparators.NUMERIC) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expected + ); + } + + @Test + public void testNullFloatTopN() throws Exception + { + List expected; + if (useDefault) { + expected = ImmutableList.of( + new Object[]{1.0f, 1L}, + new Object[]{0.1f, 1L}, + new Object[]{0.0f, 4L} + ); + } else { + expected = ImmutableList.of( + new Object[]{null, 3L}, + new Object[]{1.0f, 1L}, + new Object[]{0.1f, 1L}, + new Object[]{0.0f, 1L} + ); + } + testQuery( + "SELECT f1, COUNT(*) FROM druid.numfoo GROUP BY f1 ORDER BY f1 DESC LIMIT 10", + QUERY_CONTEXT_DEFAULT, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("f1", "_d0", ValueType.FLOAT)) + .threshold(10) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .metric( + new InvertedTopNMetricSpec( + new DimensionTopNMetricSpec(null, StringComparators.NUMERIC) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expected + ); + } + + @Test + public void testNullLongTopN() throws Exception + { + List expected; + if (useDefault) { + expected = ImmutableList.of( + new Object[]{325323L, 1L}, + new Object[]{7L, 1L}, + new Object[]{0L, 4L} + ); + } else { + expected = ImmutableList.of( + new Object[]{null, 3L}, + new Object[]{325323L, 1L}, + new Object[]{7L, 1L}, + new Object[]{0L, 1L} + ); + } + testQuery( + "SELECT l1, COUNT(*) FROM druid.numfoo GROUP BY l1 ORDER BY l1 DESC LIMIT 10", + QUERY_CONTEXT_DEFAULT, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("l1", "_d0", ValueType.LONG)) + .threshold(10) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .metric( + new InvertedTopNMetricSpec( + new DimensionTopNMetricSpec(null, StringComparators.NUMERIC) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expected + ); + } + @Test public void testEmptyStringEquality() throws Exception { From c6927517bc6848ccd6a1cc3341586fad8d6dc318 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 16 Jan 2020 03:26:29 -0800 Subject: [PATCH 2/7] adjustments --- ...rithm.java => HeapBasedTopNAlgorithm.java} | 29 +++++----- .../topn/TimeExtractionTopNAlgorithm.java | 13 ++--- .../druid/query/topn/TopNAlgorithm.java | 4 +- .../apache/druid/query/topn/TopNMapFn.java | 15 +++-- .../apache/druid/query/topn/TopNParams.java | 8 +-- .../druid/query/topn/TopNQueryEngine.java | 24 +++----- ...eNumericTopNColumnAggregatesProcessor.java | 2 +- .../StringTopNColumnAggregatesProcessor.java | 2 +- ...ava => TopNColumnAggregatesProcessor.java} | 57 +++++++++++++++---- ...=> TopNColumnSelectorStrategyFactory.java} | 8 +-- .../druid/segment/DimensionHandlerUtils.java | 43 +++++++------- .../apache/druid/segment/VirtualColumns.java | 5 ++ 12 files changed, 121 insertions(+), 89 deletions(-) rename processing/src/main/java/org/apache/druid/query/topn/{DimExtractionTopNAlgorithm.java => HeapBasedTopNAlgorithm.java} (70%) rename processing/src/main/java/org/apache/druid/query/topn/types/{HeapBasedTopNColumnAggregatesProcessor.java => TopNColumnAggregatesProcessor.java} (54%) rename processing/src/main/java/org/apache/druid/query/topn/types/{HeapBasedTopNColumnAggregatesProcessorFactory.java => TopNColumnSelectorStrategyFactory.java} (91%) diff --git a/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java similarity index 70% rename from processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java rename to processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java index 823397bbc813..5ea1ccda0b43 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java @@ -21,19 +21,22 @@ import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.topn.types.HeapBasedTopNColumnAggregatesProcessor; +import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.StorageAdapter; /** - * This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle multiple index numerals referencing the same dimension value. + * Heap based topn algorithm that handles aggregates on dimension extractions and numeric typed dimension columns. + * + * This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle + * multiple index numerals referencing the same dimension value. */ -public class DimExtractionTopNAlgorithm - extends BaseTopNAlgorithm +public class HeapBasedTopNAlgorithm + extends BaseTopNAlgorithm { private final TopNQuery query; - public DimExtractionTopNAlgorithm( + public HeapBasedTopNAlgorithm( StorageAdapter storageAdapter, TopNQuery query ) @@ -45,7 +48,7 @@ public DimExtractionTopNAlgorithm( @Override public TopNParams makeInitParams( - final ColumnSelectorPlus selectorPlus, + final ColumnSelectorPlus selectorPlus, final Cursor cursor ) { @@ -62,7 +65,7 @@ protected Aggregator[][] makeDimValSelector(TopNParams params, int numProcessed, if (params.getCardinality() < 0) { throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality"); } - ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); return selectorPlus.getColumnSelectorStrategy().getRowSelector(query, params, storageAdapter); } @@ -73,9 +76,9 @@ protected Aggregator[][] updateDimValSelector(Aggregator[][] aggregators, int nu } @Override - protected HeapBasedTopNColumnAggregatesProcessor makeDimValAggregateStore(TopNParams params) + protected TopNColumnAggregatesProcessor makeDimValAggregateStore(TopNParams params) { - final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); return selectorPlus.getColumnSelectorStrategy(); } @@ -83,11 +86,11 @@ protected HeapBasedTopNColumnAggregatesProcessor makeDimValAggregateStore(TopNPa protected long scanAndAggregate( TopNParams params, Aggregator[][] rowSelector, - HeapBasedTopNColumnAggregatesProcessor processor + TopNColumnAggregatesProcessor processor ) { final Cursor cursor = params.getCursor(); - final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); return processor.scanAndAggregate( query, @@ -101,7 +104,7 @@ protected long scanAndAggregate( protected void updateResults( TopNParams params, Aggregator[][] aggregators, - HeapBasedTopNColumnAggregatesProcessor processor, + TopNColumnAggregatesProcessor processor, TopNResultBuilder resultBuilder ) { @@ -109,7 +112,7 @@ protected void updateResults( } @Override - protected void closeAggregators(HeapBasedTopNColumnAggregatesProcessor processor) + protected void closeAggregators(TopNColumnAggregatesProcessor processor) { processor.closeAggregators(); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index 5a244608b892..a23518a4aa90 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -31,7 +31,7 @@ import java.util.Map; import java.util.function.Function; -public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm, TopNParams> +public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm, Aggregator[]>, TopNParams> { private static final int[] EMPTY_INTS = new int[]{}; @@ -75,8 +75,7 @@ protected int[] updateDimValSelector(int[] dimValSelector, int numProcessed, int } @Override - @SuppressWarnings("unchecked") - protected Map makeDimValAggregateStore(TopNParams params) + protected Map, Aggregator[]> makeDimValAggregateStore(TopNParams params) { return new HashMap<>(); } @@ -85,7 +84,7 @@ protected Map makeDimValAggregateStore(TopNParams para protected long scanAndAggregate( TopNParams params, int[] dimValSelector, - Map aggregatesStore + Map, Aggregator[]> aggregatesStore ) { if (params.getCardinality() < 0) { @@ -119,11 +118,11 @@ protected long scanAndAggregate( protected void updateResults( TopNParams params, int[] dimValSelector, - Map aggregatesStore, + Map, Aggregator[]> aggregatesStore, TopNResultBuilder resultBuilder ) { - for (Map.Entry entry : aggregatesStore.entrySet()) { + for (Map.Entry, Aggregator[]> entry : aggregatesStore.entrySet()) { Aggregator[] aggs = entry.getValue(); if (aggs != null) { Object[] vals = new Object[aggs.length]; @@ -141,7 +140,7 @@ protected void updateResults( } @Override - protected void closeAggregators(Map stringMap) + protected void closeAggregators(Map, Aggregator[]> stringMap) { for (Aggregator[] aggregators : stringMap.values()) { for (Aggregator agg : aggregators) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java index bb2d2e0d4450..31b4d9204e8d 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java @@ -21,7 +21,7 @@ import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.topn.types.HeapBasedTopNColumnAggregatesProcessor; +import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor; import org.apache.druid.segment.Cursor; import javax.annotation.Nullable; @@ -34,7 +34,7 @@ public interface TopNAlgorithm int INIT_POSITION_VALUE = -1; int SKIP_POSITION_VALUE = -2; - TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor); + TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor); void run( Parameters params, diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java index 98dd6be6176a..101f78f3ec69 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java @@ -21,7 +21,8 @@ import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.Result; -import org.apache.druid.query.topn.types.HeapBasedTopNColumnAggregatesProcessorFactory; +import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor; +import org.apache.druid.query.topn.types.TopNColumnSelectorStrategyFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionHandlerUtils; @@ -42,13 +43,15 @@ public TopNMapFn( } @SuppressWarnings("unchecked") + @Nullable public Result apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics) { - final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus( - new HeapBasedTopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()), - query.getDimensionSpec(), - cursor.getColumnSelectorFactory() - ); + final ColumnSelectorPlus> selectorPlus = + DimensionHandlerUtils.createColumnSelectorPlus( + new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()), + query.getDimensionSpec(), + cursor.getColumnSelectorFactory() + ); if (selectorPlus.getSelector() == null) { return null; diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java index 389c6a1cf244..cdc541f99248 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java @@ -20,7 +20,7 @@ package org.apache.druid.query.topn; import org.apache.druid.query.ColumnSelectorPlus; -import org.apache.druid.query.topn.types.HeapBasedTopNColumnAggregatesProcessor; +import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; @@ -32,10 +32,10 @@ public class TopNParams private final Cursor cursor; private final int cardinality; private final int numValuesPerPass; - private final ColumnSelectorPlus selectorPlus; + private final ColumnSelectorPlus selectorPlus; protected TopNParams( - ColumnSelectorPlus selectorPlus, + ColumnSelectorPlus selectorPlus, Cursor cursor, int numValuesPerPass ) @@ -53,7 +53,7 @@ public DimensionSelector getDimSelector() return (DimensionSelector) selectorPlus.getSelector(); } - public ColumnSelectorPlus getSelectorPlus() + public ColumnSelectorPlus getSelectorPlus() { return selectorPlus; } diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 0b2fb487354e..f785775fdd29 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -19,7 +19,6 @@ package org.apache.druid.query.topn; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import org.apache.druid.collections.NonBlockingPool; @@ -30,7 +29,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.filter.Filter; -import org.apache.druid.segment.Cursor; import org.apache.druid.segment.SegmentMissingException; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ColumnCapabilities; @@ -86,16 +84,11 @@ public Sequence> query( query.isDescending(), queryMetrics ), - new Function>() - { - @Override - public Result apply(Cursor input) - { - if (queryMetrics != null) { - queryMetrics.cursor(input); - } - return mapFn.apply(input, queryMetrics); + input -> { + if (queryMetrics != null) { + queryMetrics.cursor(input); } + return mapFn.apply(input, queryMetrics); } ), Predicates.notNull() @@ -125,7 +118,8 @@ private TopNMapFn getMapFn( final ColumnCapabilities columnCapabilities = query.getVirtualColumns() .getColumnCapabilitiesWithFallback(adapter, dimension); - final TopNAlgorithm topNAlgorithm; + + final TopNAlgorithm topNAlgorithm; if ( selector.isHasExtractionFn() && // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long. @@ -137,15 +131,15 @@ private TopNMapFn getMapFn( // currently relies on the dimension cardinality to support lexicographic sorting topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query); } else if (selector.isHasExtractionFn()) { - topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query); + topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING && columnCapabilities.isDictionaryEncoded())) { // Use DimExtraction for non-Strings and for non-dictionary-encoded Strings. - topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query); + topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) { // Use DimExtraction when the dimension output type is a non-String. (It's like an extractionFn: there can be // a many-to-one mapping, since numeric types can't represent all possible values of other types.) - topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query); + topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (selector.isAggregateAllMetrics()) { topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java index 72c573d70d3c..ebc3c04ffb57 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java @@ -33,7 +33,7 @@ import java.util.function.Function; public abstract class NullableNumericTopNColumnAggregatesProcessor - implements HeapBasedTopNColumnAggregatesProcessor + implements TopNColumnAggregatesProcessor { private final boolean hasNulls = !NullHandling.replaceWithDefault(); final Function> converter; diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index 2d75e4c3f0d8..4c961dbadf4f 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -36,7 +36,7 @@ import java.util.Map; import java.util.function.Function; -public class StringTopNColumnAggregatesProcessor implements HeapBasedTopNColumnAggregatesProcessor +public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor { private final Function> dimensionValueConverter; private HashMap, Aggregator[]> aggregatesStore; diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java similarity index 54% rename from processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessor.java rename to processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java index 4cb5a9e1fa47..34e448f1c1a8 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java @@ -21,23 +21,39 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.dimension.ColumnSelectorStrategy; +import org.apache.druid.query.topn.HeapBasedTopNAlgorithm; import org.apache.druid.query.topn.TopNParams; import org.apache.druid.query.topn.TopNQuery; import org.apache.druid.query.topn.TopNResultBuilder; +import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.StorageAdapter; -public interface HeapBasedTopNColumnAggregatesProcessor extends ColumnSelectorStrategy +import javax.annotation.Nullable; + +/** + * This {@link ColumnSelectorStrategy} is used by all {@link org.apache.druid.query.topn.TopNAlgorithm} to provide + * selector value cardinality to {@link TopNParams} (perhaps unecessarily, but that is another matter), but is primarily + * used by {@link HeapBasedTopNAlgorithm} to serve as its value aggregates store. + * + * Given a query, column value selector, and cursor to process, the aggregates store is populated by calling + * {@link #scanAndAggregate} and can be applied to {@link TopNResultBuilder} through {@link #updateResults}. + */ +public interface TopNColumnAggregatesProcessor extends ColumnSelectorStrategy { + /** + * Get value cardinality of underlying {@link ColumnValueSelector} + */ int getCardinality(ValueSelectorType selector); /** - * Used by DimExtractionTopNAlgorithm. + * Used by {@link HeapBasedTopNAlgorithm}. * - * Create an Aggregator[][] using BaseTopNAlgorithm.AggregatorArrayProvider and the given parameters. + * Create an Aggregator[][] using {@link org.apache.druid.query.topn.BaseTopNAlgorithm.AggregatorArrayProvider} and + * the given parameters. * * As the Aggregator[][] is used as an integer-based lookup, this method is only applicable for dimension types - * that use integer row values. + * that use integer row values, e.g. string columns where the value cardinality is known. * * A dimension type that does not have integer values should return null. * @@ -48,20 +64,23 @@ public interface HeapBasedTopNColumnAggregatesProcessor exten * * @return an Aggregator[][] for integer-valued dimensions, null otherwise */ + @Nullable Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter); /** - * Used by DimExtractionTopNAlgorithm. + * Used by {@link HeapBasedTopNAlgorithm}. * - * Iterate through the cursor, reading the current row from a dimension value selector, and for each row value: - * 1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup) or from - * aggregatesStore (slower map). + * Iterate through the {@link Cursor}, reading the current row from a dimension value selector, and for each row + * value: + * 1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup), usable if value cardinality + * is known, or from aggregatesStore (slower map). * - * 2. If the rowSelector and/or aggregatesStore did not have an entry for a particular row value, - * this function should retrieve the current Aggregator[] using BaseTopNAlgorithm.makeAggregators() and the - * provided cursor and query, storing them in rowSelector and aggregatesStore + * 2. If the rowSelector/aggregatesStore did not have an entry for a particular row value, this function + * should retrieve the current Aggregator[] using + * {@link org.apache.druid.query.topn.BaseTopNAlgorithm#makeAggregators} and the provided cursor and query, + * storing them in rowSelector/aggregatesStore * - * 3. Call aggregate() on each of the aggregators. + * 3. Call {@link Aggregator#aggregate()} on each of the aggregators. * * If a dimension type doesn't have integer values, it should ignore rowSelector and use the aggregatesStore map only. * @@ -79,9 +98,23 @@ long scanAndAggregate( Aggregator[][] rowSelector ); + /** + * Used by {@link HeapBasedTopNAlgorithm}. + * + * Read entries from the aggregates store, adding the keys and associated values to the resultBuilder, applying the + * valueTransformer to the keys if present + * + * @param resultBuilder TopN result builder + */ void updateResults(TopNResultBuilder resultBuilder); + /** + * Initializes the underlying aggregates store + */ void initAggregateStore(); + /** + * Closes all on heap {@link Aggregator} associated withe the aggregates processor + */ void closeAggregators(); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java similarity index 91% rename from processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessorFactory.java rename to processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java index bd3ab82df377..670cc49adb70 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/HeapBasedTopNColumnAggregatesProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java @@ -29,18 +29,18 @@ import java.util.function.Function; -public class HeapBasedTopNColumnAggregatesProcessorFactory - implements ColumnSelectorStrategyFactory +public class TopNColumnSelectorStrategyFactory + implements ColumnSelectorStrategyFactory> { private final ValueType dimensionType; - public HeapBasedTopNColumnAggregatesProcessorFactory(final ValueType dimensionType) + public TopNColumnSelectorStrategyFactory(final ValueType dimensionType) { this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType"); } @Override - public HeapBasedTopNColumnAggregatesProcessor makeColumnSelectorStrategy( + public TopNColumnAggregatesProcessor makeColumnSelectorStrategy( ColumnCapabilities capabilities, ColumnValueSelector selector ) diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java index e9396c504372..c8a49e7c8057 100644 --- a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java @@ -64,9 +64,9 @@ private DimensionHandlerUtils() .setDictionaryEncoded(true) .setHasBitmapIndexes(true); - public static DimensionHandler getHandlerFromCapabilities( + public static DimensionHandler getHandlerFromCapabilities( String dimensionName, - ColumnCapabilities capabilities, + @Nullable ColumnCapabilities capabilities, @Nullable MultiValueHandling multiValueHandling ) { @@ -113,15 +113,15 @@ public static List getValueTypesFromDimensionSpecs(List The strategy type created by the provided strategy factory. + * @param The strategy type created by the provided strategy factory. * @param strategyFactory A factory provided by query engines that generates type-handling strategies * @param dimensionSpec column to generate a ColumnSelectorPlus object for * @param cursor Used to create value selectors for columns. * * @return A ColumnSelectorPlus object */ - public static ColumnSelectorPlus createColumnSelectorPlus( - ColumnSelectorStrategyFactory strategyFactory, + public static ColumnSelectorPlus createColumnSelectorPlus( + ColumnSelectorStrategyFactory strategyFactory, DimensionSpec dimensionSpec, ColumnSelectorFactory cursor ) @@ -140,39 +140,36 @@ public static Colum * A caller should define a strategy factory that provides an interface for type-specific operations * in a query engine. See GroupByStrategyFactory for a reference. * - * @param The strategy type created by the provided strategy factory. + * @param The strategy type created by the provided strategy factory. * @param strategyFactory A factory provided by query engines that generates type-handling strategies * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for * @param columnSelectorFactory Used to create value selectors for columns. * * @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs */ - public static - //CHECKSTYLE.OFF: Indentation - ColumnSelectorPlus[] createColumnSelectorPluses( - //CHECKSTYLE.ON: Indentation - ColumnSelectorStrategyFactory strategyFactory, + public static ColumnSelectorPlus[] createColumnSelectorPluses( + ColumnSelectorStrategyFactory strategyFactory, List dimensionSpecs, ColumnSelectorFactory columnSelectorFactory ) { int dimCount = dimensionSpecs.size(); @SuppressWarnings("unchecked") - ColumnSelectorPlus[] dims = new ColumnSelectorPlus[dimCount]; + ColumnSelectorPlus[] dims = new ColumnSelectorPlus[dimCount]; for (int i = 0; i < dimCount; i++) { final DimensionSpec dimSpec = dimensionSpecs.get(i); final String dimName = dimSpec.getDimension(); - final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec( + final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec( dimSpec, columnSelectorFactory ); - ColumnSelectorStrategyClass strategy = makeStrategy( + Strategy strategy = makeStrategy( strategyFactory, dimSpec, columnSelectorFactory.getColumnCapabilities(dimSpec.getDimension()), selector ); - final ColumnSelectorPlus selectorPlus = new ColumnSelectorPlus<>( + final ColumnSelectorPlus selectorPlus = new ColumnSelectorPlus<>( dimName, dimSpec.getOutputName(), strategy, @@ -183,7 +180,7 @@ ColumnSelectorPlus[] createColumnSelectorPluses( return dims; } - private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec( + private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec( DimensionSpec dimSpec, ColumnSelectorFactory columnSelectorFactory ) @@ -191,12 +188,10 @@ private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec( String dimName = dimSpec.getDimension(); ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimName); capabilities = getEffectiveCapabilities(dimSpec, capabilities); - switch (capabilities.getType()) { - case STRING: - return columnSelectorFactory.makeDimensionSelector(dimSpec); - default: - return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension()); + if (capabilities.getType() == ValueType.STRING) { + return columnSelectorFactory.makeDimensionSelector(dimSpec); } + return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension()); } /** @@ -235,11 +230,11 @@ private static ColumnCapabilities getEffectiveCapabilities( return capabilities; } - private static ColumnSelectorStrategyClass makeStrategy( - ColumnSelectorStrategyFactory strategyFactory, + private static Strategy makeStrategy( + ColumnSelectorStrategyFactory strategyFactory, DimensionSpec dimSpec, @Nullable ColumnCapabilities capabilities, - ColumnValueSelector selector + ColumnValueSelector selector ) { capabilities = getEffectiveCapabilities(dimSpec, capabilities); diff --git a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java index 9802aadd661b..b9866ac07422 100644 --- a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java +++ b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java @@ -141,6 +141,7 @@ public boolean exists(String columnName) return getVirtualColumn(columnName) != null; } + @Nullable public VirtualColumn getVirtualColumn(String columnName) { final VirtualColumn vc = withoutDotSupport.get(columnName); @@ -184,6 +185,7 @@ public BitmapIndex getBitmapIndex(String columnName, ColumnSelector columnSelect } } + @Nullable public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec, ColumnSelector columnSelector, ReadableOffset offset) { final VirtualColumn virtualColumn = getVirtualColumn(dimensionSpec.getDimension()); @@ -194,6 +196,7 @@ public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec, Colu } } + @Nullable public ColumnValueSelector makeColumnValueSelector(String columnName, ColumnSelector columnSelector, ReadableOffset offset) { final VirtualColumn virtualColumn = getVirtualColumn(columnName); @@ -226,6 +229,7 @@ public ColumnValueSelector makeColumnValueSelector(String columnName, ColumnS } } + @Nullable public ColumnCapabilities getColumnCapabilities(String columnName) { final VirtualColumn virtualColumn = getVirtualColumn(columnName); @@ -240,6 +244,7 @@ public ColumnCapabilities getColumnCapabilities(String columnName) } } + @Nullable public ColumnCapabilities getColumnCapabilitiesWithFallback(StorageAdapter adapter, String columnName) { final ColumnCapabilities virtualColumnCapabilities = getColumnCapabilities(columnName); From 515af98ae6a366028cdcefb554d42294bc46d034 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 16 Jan 2020 14:44:22 -0800 Subject: [PATCH 3/7] rename --- .../topn/types/StringTopNColumnAggregatesProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index 4c961dbadf4f..04ebc145dd11 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -110,9 +110,9 @@ public long scanAndAggregate( { initAggregateStore(); if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) { - return dimExtractionScanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector); + return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector); } else { - return dimExtractionScanAndAggregateWithCardinalityUnknown(query, cursor, selector); + return scanAndAggregateWithCardinalityUnknown(query, cursor, selector); } } @@ -122,7 +122,7 @@ public void initAggregateStore() this.aggregatesStore = new HashMap<>(); } - private long dimExtractionScanAndAggregateWithCardinalityKnown( + private long scanAndAggregateWithCardinalityKnown( TopNQuery query, Cursor cursor, DimensionSelector selector, @@ -155,7 +155,7 @@ private long dimExtractionScanAndAggregateWithCardinalityKnown( return processedRows; } - private long dimExtractionScanAndAggregateWithCardinalityUnknown( + private long scanAndAggregateWithCardinalityUnknown( TopNQuery query, Cursor cursor, DimensionSelector selector From 777bf78b02241314f609feb192502eb03146b1b6 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 16 Jan 2020 17:47:38 -0800 Subject: [PATCH 4/7] add more tests --- .../druid/query/topn/TopNQueryRunnerTest.java | 126 ++++++++++++++++-- 1 file changed, 112 insertions(+), 14 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java index 0ba5860dc74a..7340da59037e 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java @@ -100,6 +100,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -5832,9 +5833,9 @@ public void test_topN_orderByLongNumericColumnWithNulls_returnsDescendingResults .put("index_alias", 147L) .put("longNumericNull", 10L) .build(), - makeNumericNullRowHelper("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()), - makeNumericNullRowHelper("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()), - makeNumericNullRowHelper("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue()) + makeRowWithNulls("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()), + makeRowWithNulls("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()), + makeRowWithNulls("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue()) ) ) ) @@ -5900,9 +5901,9 @@ public void test_topN_orderByFloatNumericColumnWithNulls_returnsDescendingResult .put("index_alias", 147L) .put("floatNumericNull", 10f) .build(), - makeNumericNullRowHelper("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()), - makeNumericNullRowHelper("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()), - makeNumericNullRowHelper("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue()) + makeRowWithNulls("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()), + makeRowWithNulls("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()), + makeRowWithNulls("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue()) ) ) ) @@ -5968,9 +5969,9 @@ public void test_topN_orderByDoubleNumericColumnWithNulls_returnsDescendingResul .put("index_alias", 147L) .put("doubleNumericNull", 10d) .build(), - makeNumericNullRowHelper("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()), - makeNumericNullRowHelper("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()), - makeNumericNullRowHelper("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue()) + makeRowWithNulls("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()), + makeRowWithNulls("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()), + makeRowWithNulls("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue()) ) ) ) @@ -5978,16 +5979,113 @@ public void test_topN_orderByDoubleNumericColumnWithNulls_returnsDescendingResul assertExpectedResults(expectedResults, query); } - private static Map makeNumericNullRowHelper( + + @Test + public void testAggregateOnLongNumericNull() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(new DefaultDimensionSpec("longNumericNull", "dim", ValueType.LONG)) + .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) + .threshold(10000) + .aggregators(new CountAggregatorFactory("count")) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .build(); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.asList( + makeRowWithNulls("dim", NullHandling.defaultLongValue(), "count", 279L), + makeRowWithNulls("dim", 10L, "count", 93L), + makeRowWithNulls("dim", 20L, "count", 93L), + makeRowWithNulls("dim", 40L, "count", 93L), + makeRowWithNulls("dim", 50L, "count", 279L), + makeRowWithNulls("dim", 70L, "count", 279L), + makeRowWithNulls("dim", 80L, "count", 93L) + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + + @Test + public void testAggregateOnDoubleNumericNull() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(new DefaultDimensionSpec("doubleNumericNull", "dim", ValueType.DOUBLE)) + .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) + .threshold(10000) + .aggregators(new CountAggregatorFactory("count")) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .build(); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.asList( + makeRowWithNulls("dim", NullHandling.defaultDoubleValue(), "count", 279L), + makeRowWithNulls("dim", 10.0, "count", 93L), + makeRowWithNulls("dim", 20.0, "count", 93L), + makeRowWithNulls("dim", 40.0, "count", 93L), + makeRowWithNulls("dim", 50.0, "count", 279L), + makeRowWithNulls("dim", 70.0, "count", 279L), + makeRowWithNulls("dim", 80.0, "count", 93L) + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + + @Test + public void testAggregateOnFloatNumericNull() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(new DefaultDimensionSpec("floatNumericNull", "dim", ValueType.FLOAT)) + .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) + .threshold(10000) + .aggregators(new CountAggregatorFactory("count")) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .build(); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.asList( + makeRowWithNulls("dim", NullHandling.defaultFloatValue(), "count", 279L), + makeRowWithNulls("dim", 10.0f, "count", 93L), + makeRowWithNulls("dim", 20.0f, "count", 93L), + makeRowWithNulls("dim", 40.0f, "count", 93L), + makeRowWithNulls("dim", 50.0f, "count", 279L), + makeRowWithNulls("dim", 70.0f, "count", 279L), + makeRowWithNulls("dim", 80.0f, "count", 93L) + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + + private static Map makeRowWithNulls( String dimName, - Object dimValue, - String nameOfColumnWithNull, - Object defaultNullValue + @Nullable Object dimValue, + String metric, + @Nullable Object metricVal ) { Map nullRow = new HashMap<>(); nullRow.put(dimName, dimValue); - nullRow.put(nameOfColumnWithNull, defaultNullValue); + nullRow.put(metric, metricVal); return nullRow; } } From ac2d66ee6aeab38263bd6f4ef276d4522140964e Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 16 Jan 2020 18:40:04 -0800 Subject: [PATCH 5/7] fix comments --- .../java/org/apache/druid/query/topn/TopNQueryEngine.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index f785775fdd29..dfe180971baa 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -134,17 +134,20 @@ private TopNMapFn getMapFn( topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING && columnCapabilities.isDictionaryEncoded())) { - // Use DimExtraction for non-Strings and for non-dictionary-encoded Strings. + // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings. topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) { - // Use DimExtraction when the dimension output type is a non-String. (It's like an extractionFn: there can be + // Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be // a many-to-one mapping, since numeric types can't represent all possible values of other types.) topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (selector.isAggregateAllMetrics()) { + // sorted by dimension topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) { + // high cardinality dimensions with larger result sets topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool); } else { + // anything else topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); } if (queryMetrics != null) { From 6130e6a5ca196644aa59f0e0708437c93699e089 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 16 Jan 2020 19:05:32 -0800 Subject: [PATCH 6/7] more javadocs --- .../query/topn/HeapBasedTopNAlgorithm.java | 1 + ...eNumericTopNColumnAggregatesProcessor.java | 21 ++++++++++++++++++- .../StringTopNColumnAggregatesProcessor.java | 1 - .../types/TopNColumnAggregatesProcessor.java | 5 +++-- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java index 5ea1ccda0b43..87f79562fab3 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java @@ -92,6 +92,7 @@ protected long scanAndAggregate( final Cursor cursor = params.getCursor(); final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); + processor.initAggregateStore(); return processor.scanAndAggregate( query, selectorPlus.getSelector(), diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java index ebc3c04ffb57..0b8e90c1659f 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java @@ -32,6 +32,15 @@ import java.util.Map; import java.util.function.Function; +/** + * Base {@link TopNColumnAggregatesProcessor} for {@link BaseNullableColumnValueSelector}. Non-null selector values + * aggregates are stored in a type appropriate primitive map, created by {@link #initAggregateStore()} and available + * via {@link #getAggregatesStore()}, and null valued row aggregates are stored in a separate + * {@link #nullValueAggregates} {@link Aggregator} array. + * + * {@link #updateResults} will combine both the map and null aggregates to populate the {@link TopNResultBuilder} with + * the values produced by {@link #scanAndAggregate}. + */ public abstract class NullableNumericTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor { @@ -44,10 +53,21 @@ protected NullableNumericTopNColumnAggregatesProcessor(Function getAggregatesStore(); + /** + * Method to convert primitive numeric value keys used by {@link #getAggregatesStore} into the correct representation + * for the {@link TopNResultBuilder}, called by {@link #updateResults} + */ abstract Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey); @Override @@ -70,7 +90,6 @@ public long scanAndAggregate( Aggregator[][] rowSelector ) { - initAggregateStore(); long processedRows = 0; while (!cursor.isDone()) { if (hasNulls && selector.isNull()) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index 04ebc145dd11..53b98e56767b 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -108,7 +108,6 @@ public long scanAndAggregate( Aggregator[][] rowSelector ) { - initAggregateStore(); if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) { return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector); } else { diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java index 34e448f1c1a8..ac5b21ff123e 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java @@ -68,7 +68,8 @@ public interface TopNColumnAggregatesProcessor extends Column Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter); /** - * Used by {@link HeapBasedTopNAlgorithm}. + * Used by {@link HeapBasedTopNAlgorithm}. The contract of this method requires calling {@link #initAggregateStore()} + * prior to calling this method. * * Iterate through the {@link Cursor}, reading the current row from a dimension value selector, and for each row * value: @@ -109,7 +110,7 @@ long scanAndAggregate( void updateResults(TopNResultBuilder resultBuilder); /** - * Initializes the underlying aggregates store + * Initializes the underlying aggregates store to something nice and seleector type appropriate */ void initAggregateStore(); From 3c9778ffadfa3b97d076ff3f1ef39ad6ea123305 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 17 Jan 2020 16:15:05 -0800 Subject: [PATCH 7/7] computeIfAbsent --- .../apache/druid/query/topn/TopNMapFn.java | 4 +-- .../DoubleTopNColumnAggregatesProcessor.java | 10 +++---- .../FloatTopNColumnAggregatesProcessor.java | 10 +++---- .../LongTopNColumnAggregatesProcessor.java | 10 +++---- .../StringTopNColumnAggregatesProcessor.java | 26 +++++++++---------- ...TopNColumnAggregatesProcessorFactory.java} | 4 +-- 6 files changed, 28 insertions(+), 36 deletions(-) rename processing/src/main/java/org/apache/druid/query/topn/types/{TopNColumnSelectorStrategyFactory.java => TopNColumnAggregatesProcessorFactory.java} (96%) diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java index 101f78f3ec69..96fb62f9012a 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java @@ -22,7 +22,7 @@ import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.Result; import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor; -import org.apache.druid.query.topn.types.TopNColumnSelectorStrategyFactory; +import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionHandlerUtils; @@ -48,7 +48,7 @@ public Result apply(final Cursor cursor, final @Nullable TopNQu { final ColumnSelectorPlus> selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus( - new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()), + new TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()), query.getDimensionSpec(), cursor.getColumnSelectorFactory() ); diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java index a47b9a82f51a..2a2781d45b97 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java @@ -48,12 +48,10 @@ Aggregator[] getValueAggregators( ) { long key = Double.doubleToLongBits(selector.getDouble()); - Aggregator[] aggs = aggregatesStore.get(key); - if (aggs == null) { - aggs = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, aggs); - } - return aggs; + return aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java index 42c36a86b8f5..4e8dd8839d5b 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java @@ -48,12 +48,10 @@ Aggregator[] getValueAggregators( ) { int key = Float.floatToIntBits(selector.getFloat()); - Aggregator[] aggs = aggregatesStore.get(key); - if (aggs == null) { - aggs = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, aggs); - } - return aggs; + return aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java index 41b220ae7597..d28d8a813974 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java @@ -44,12 +44,10 @@ public LongTopNColumnAggregatesProcessor(Function> convert Aggregator[] getValueAggregators(TopNQuery query, BaseLongColumnValueSelector selector, Cursor cursor) { long key = selector.getLong(); - Aggregator[] aggs = aggregatesStore.get(key); - if (aggs == null) { - aggs = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, aggs); - } - return aggs; + return aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index 53b98e56767b..cea7c775a758 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -133,18 +133,17 @@ private long scanAndAggregateWithCardinalityKnown( final IndexedInts dimValues = selector.getRow(); for (int i = 0, size = dimValues.size(); i < size; ++i) { final int dimIndex = dimValues.get(i); - Aggregator[] theAggregators = rowSelector[dimIndex]; - if (theAggregators == null) { + Aggregator[] aggs = rowSelector[dimIndex]; + if (aggs == null) { final Comparable key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); - theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - rowSelector[dimIndex] = theAggregators; + aggs = aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + rowSelector[dimIndex] = aggs; } - for (Aggregator aggregator : theAggregators) { + for (Aggregator aggregator : aggs) { aggregator.aggregate(); } } @@ -166,11 +165,10 @@ private long scanAndAggregateWithCardinalityUnknown( for (int i = 0, size = dimValues.size(); i < size; ++i) { final int dimIndex = dimValues.get(i); final Comparable key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); - Aggregator[] aggs = aggregatesStore.get(key); - if (aggs == null) { - aggs = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, aggs); - } + Aggregator[] aggs = aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); for (Aggregator aggregator : aggs) { aggregator.aggregate(); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java similarity index 96% rename from processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java rename to processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java index 670cc49adb70..56a29433fcb0 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java @@ -29,12 +29,12 @@ import java.util.function.Function; -public class TopNColumnSelectorStrategyFactory +public class TopNColumnAggregatesProcessorFactory implements ColumnSelectorStrategyFactory> { private final ValueType dimensionType; - public TopNColumnSelectorStrategyFactory(final ValueType dimensionType) + public TopNColumnAggregatesProcessorFactory(final ValueType dimensionType) { this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType"); }