diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 823f5f250a82..28399e0dbd35 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -227,7 +227,7 @@ public GroupByEngineIterator make() processingBuffer, fudgeTimestamp, dims, - isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions()), + isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions(), false), cardinalityForArrayAggregation ); } else { @@ -238,7 +238,7 @@ public GroupByEngineIterator make() processingBuffer, fudgeTimestamp, dims, - isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions()) + isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions(), false) ); } } @@ -313,12 +313,15 @@ public static int getCardinalityForArrayAggregation( } /** - * Checks whether all "dimensions" are either single-valued or nonexistent (which is just as good as single-valued, - * since their selectors will show up as full of nulls). + * Checks whether all "dimensions" are either single-valued, or if allowed, nonexistent. Since non-existent column + * selectors will show up as full of nulls they are effectively single valued, however they can also be null during + * broker merge, for example with an 'inline' datasource subquery. 'missingMeansNonexistent' is sort of a hack to let + * the vectorized engine, which only operates on actual segments, to still work in this case for non-existent columns. */ public static boolean isAllSingleValueDims( final Function capabilitiesFunction, - final List dimensions + final List dimensions, + final boolean missingMeansNonexistent ) { return dimensions @@ -333,7 +336,8 @@ public static boolean isAllSingleValueDims( // Now check column capabilities. final ColumnCapabilities columnCapabilities = capabilitiesFunction.apply(dimension.getDimension()); - return columnCapabilities == null || !columnCapabilities.hasMultipleValues(); + return (columnCapabilities != null && !columnCapabilities.hasMultipleValues()) || + (missingMeansNonexistent && columnCapabilities == null); }); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index f8a3a34f167f..3fa85040cea4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -83,7 +83,7 @@ public static boolean canVectorize( // This situation should sort itself out pretty well once this engine supports multi-valued columns. Then we // won't have to worry about having this all-single-value-dims check here. - return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions()) + return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions(), true) && query.getDimensions().stream().allMatch(DimensionSpec::canVectorize) && query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize) && adapter.canVectorize(filter, query.getVirtualColumns(), false); diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index dfe180971baa..1d1bccdd7b1b 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -132,9 +132,10 @@ private TopNMapFn getMapFn( topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query); } else if (selector.isHasExtractionFn()) { topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); - } else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING + } else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING && columnCapabilities.isDictionaryEncoded())) { - // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings. + // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings, and for things we don't know + // which can happen for 'inline' data sources when this is run on the broker topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) { // Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index c98c7768decd..f224b5f580e4 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -56,7 +56,11 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryHelper; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.topn.TopNQueryBuilder; import org.apache.druid.segment.InlineSegmentWrangler; import org.apache.druid.segment.MapSegmentWrangler; import org.apache.druid.segment.ReferenceCountingSegment; @@ -107,6 +111,7 @@ public class ClientQuerySegmentWalkerTest private static final String FOO = "foo"; private static final String BAR = "bar"; + private static final String MULTI = "multi"; private static final Interval INTERVAL = Intervals.of("2000/P1Y"); private static final String VERSION = "A"; @@ -140,6 +145,20 @@ public class ClientQuerySegmentWalkerTest .build() ); + private static final InlineDataSource MULTI_VALUE_INLINE = InlineDataSource.fromIterable( + ImmutableList.builder() + .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", "b"), 1}) + .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", "c"), 2}) + .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("b"), 3}) + .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("c"), 4}) + .build(), + RowSignature.builder() + .addTimeColumn() + .add("s", ValueType.STRING) + .add("n", ValueType.LONG) + .build() + ); + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -399,6 +418,115 @@ public void testJoinOnGroupByOnTable() Assert.assertEquals(2, scheduler.getTotalReleased().get()); } + @Test + public void testGroupByOnScanMultiValue() + { + ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI) + .columns("s", "n") + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.ETERNITY) + ) + ) + .legacy(false) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + final GroupByQuery query = + GroupByQuery.builder() + .setDataSource(new QueryDataSource(subquery)) + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ONLY_ETERNITY) + .setDimensions(DefaultDimensionSpec.of("s")) + .setAggregatorSpecs(new LongSumAggregatorFactory("sum_n", "n")) + .build(); + + testQuery( + query, + // GroupBy handles its own subqueries; only the inner one will go to the cluster. + ImmutableList.of( + ExpectedQuery.cluster(subquery), + ExpectedQuery.local( + query.withDataSource( + InlineDataSource.fromIterable( + ImmutableList.of( + new Object[]{ImmutableList.of("a", "b"), 1}, + new Object[]{ImmutableList.of("a", "c"), 2}, + new Object[]{ImmutableList.of("b"), 3}, + new Object[]{ImmutableList.of("c"), 4} + ), + RowSignature.builder().add("s", null).add("n", null).build() + ) + ) + ) + ), + ImmutableList.of( + new Object[]{"a", 3L}, + new Object[]{"b", 4L}, + new Object[]{"c", 6L} + ) + ); + + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); + } + + @Test + public void testTopNScanMultiValue() + { + ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI) + .columns("s", "n") + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.ETERNITY) + ) + ) + .legacy(false) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + final TopNQuery query = + new TopNQueryBuilder().dataSource(new QueryDataSource(subquery)) + .granularity(Granularities.ALL) + .intervals(Intervals.ONLY_ETERNITY) + .dimension(DefaultDimensionSpec.of("s")) + .metric("sum_n") + .threshold(100) + .aggregators(new LongSumAggregatorFactory("sum_n", "n")) + .build(); + + testQuery( + query, + // GroupBy handles its own subqueries; only the inner one will go to the cluster. + ImmutableList.of( + ExpectedQuery.cluster(subquery), + ExpectedQuery.local( + query.withDataSource( + InlineDataSource.fromIterable( + ImmutableList.of( + new Object[]{ImmutableList.of("a", "b"), 1}, + new Object[]{ImmutableList.of("a", "c"), 2}, + new Object[]{ImmutableList.of("b"), 3}, + new Object[]{ImmutableList.of("c"), 4} + ), + RowSignature.builder().add("s", null).add("n", null).build() + ) + ) + ) + ), + ImmutableList.of( + new Object[]{Intervals.ETERNITY.getStartMillis(), "c", 6L}, + new Object[]{Intervals.ETERNITY.getStartMillis(), "b", 4L}, + new Object[]{Intervals.ETERNITY.getStartMillis(), "a", 3L} + ) + ); + + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); + } + @Test public void testJoinOnTableErrorCantInlineTable() { @@ -522,7 +650,8 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable