diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 4bc541f4a3c1..7994996bcf69 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -233,48 +233,33 @@ public CloseableIterator> iterator(boolean sorted) return new CloseableIterator>() { - int cur; - boolean findNext = false; - - { - cur = findNext(); - } + // initialize to the first used slot + private int next = findNext(-1); @Override public boolean hasNext() { - if (findNext) { - cur = findNext(); - findNext = false; - } - return cur >= 0; - } - - private int findNext() - { - for (int i = cur + 1; i < cardinalityWithMissingValue; i++) { - if (isUsedSlot(i)) { - return i; - } - } - return -1; + return next >= 0; } @Override public Entry next() { - if (cur < 0) { + if (next < 0) { throw new NoSuchElementException(); } - findNext = true; + final int current = next; + next = findNext(current); final Object[] values = new Object[aggregators.length]; - final int recordOffset = cur * recordSize; + final int recordOffset = current * recordSize; for (int i = 0; i < aggregators.length; i++) { values[i] = aggregators[i].get(valBuffer, recordOffset + aggregatorOffsets[i]); } - return new Entry<>(cur - 1, values); + // shift by -1 since values are initially shifted by +1 so they are all positive and + // GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE is -1 + return new Entry<>(current - 1, values); } @Override @@ -282,6 +267,18 @@ public void close() { // do nothing } + + private int findNext(int current) + { + // shift by +1 since we're looking for the next used slot after the current position + for (int i = current + 1; i < cardinalityWithMissingValue; i++) { + if (isUsedSlot(i)) { + return i; + } + } + // no more slots + return -1; + } }; } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 1b58dfe31b12..e3c609e22c83 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -359,7 +359,7 @@ public boolean hasNext() delegate.close(); } delegate = initNewDelegate(); - return true; + return delegate.hasNext(); } else { return false; } @@ -681,7 +681,7 @@ protected void putToMap(Integer key, Map map) ((DimensionSelector) dim.getSelector()).lookupName(key) ); } else { - map.put(dim.getOutputName(), ""); + map.put(dim.getOutputName(), NullHandling.defaultStringValue()); } } } diff --git a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java index c4c4a0b99af6..1ee0dafec31a 100644 --- a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java @@ -25,9 +25,11 @@ import com.google.common.io.Files; import org.apache.commons.io.FileUtils; import org.apache.druid.collections.CloseableStupidPool; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.DateTimes; @@ -38,11 +40,13 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.ListFilteredDimensionSpec; import org.apache.druid.query.dimension.RegexFilteredDimensionSpec; +import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.spec.LegacySegmentSpec; import org.apache.druid.query.topn.TopNQuery; import org.apache.druid.query.topn.TopNQueryBuilder; @@ -82,13 +86,15 @@ @RunWith(Parameterized.class) public class MultiValuedDimensionTest { - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "groupby: {0} forceHashAggregation: {2} ({1})") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance()}); - constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance()}); + constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance(), false}); + constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance(), false}); + constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance(), true}); + constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance(), true}); } return constructors; } @@ -101,7 +107,13 @@ public static Collection constructorFeeder() private File persistedSegmentDir; - public MultiValuedDimensionTest(final GroupByQueryConfig config, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) + private IncrementalIndex incrementalIndexNullSampler; + private QueryableIndex queryableIndexNullSampler; + private File persistedSegmentDirNullSampler; + + private final ImmutableMap context; + + public MultiValuedDimensionTest(final GroupByQueryConfig config, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, boolean forceHashAggregation) { helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( ImmutableList.of(), @@ -109,6 +121,10 @@ public MultiValuedDimensionTest(final GroupByQueryConfig config, SegmentWriteOut null ); this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; + + this.context = config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1) + ? ImmutableMap.of() + : ImmutableMap.of("forceHashAggregation", forceHashAggregation); } @Before @@ -147,6 +163,41 @@ public void setup() throws Exception .persist(incrementalIndex, persistedSegmentDir, new IndexSpec(), null); queryableIndex = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir); + + + + StringInputRowParser parserNullSampler = new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("time", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags", "othertags")), null, null) + ), + "UTF-8" + ); + + incrementalIndexNullSampler = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setMaxRowCount(5000) + .buildOnheap(); + + String[] rowsNullSampler = new String[]{ + "{\"time\":\"2011-01-13T00:00:00.000Z\",\"product\":\"product_1\",\"tags\":[],\"othertags\":[\"u1\", \"u2\"]}", + "{\"time\":\"2011-01-12T00:00:00.000Z\",\"product\":\"product_2\",\"othertags\":[\"u3\", \"u4\"]}", + "{\"time\":\"2011-01-14T00:00:00.000Z\",\"product\":\"product_3\",\"tags\":[\"\"],\"othertags\":[\"u1\", \"u5\"]}", + "{\"time\":\"2011-01-15T00:00:00.000Z\",\"product\":\"product_4\",\"tags\":[\"t1\", \"t2\", \"\"],\"othertags\":[\"u6\", \"u7\"]}", + "{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_5\",\"tags\":[],\"othertags\":[]}", + "{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_6\"}", + "{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_7\",\"othertags\":[]}", + "{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_8\",\"tags\":[\"\"],\"othertags\":[]}" + }; + + for (String row : rowsNullSampler) { + incrementalIndexNullSampler.add(parserNullSampler.parse(row)); + } + persistedSegmentDirNullSampler = Files.createTempDir(); + TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory) + .persist(incrementalIndexNullSampler, persistedSegmentDirNullSampler, new IndexSpec(), null); + + queryableIndexNullSampler = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDirNullSampler); } @After @@ -165,6 +216,7 @@ public void testGroupByNoFilter() .setGranularity(Granularities.ALL) .setDimensions(new DefaultDimensionSpec("tags", "tags")) .setAggregatorSpecs(new CountAggregatorFactory("count")) + .setContext(context) .build(); Sequence result = helper.runQueryOnSegmentsObjs( @@ -200,6 +252,7 @@ public void testGroupByWithDimFilter() .setDimensions(new DefaultDimensionSpec("tags", "tags")) .setAggregatorSpecs(new CountAggregatorFactory("count")) .setDimFilter(new SelectorDimFilter("tags", "t3", null)) + .setContext(context) .build(); Sequence result = helper.runQueryOnSegmentsObjs( @@ -221,6 +274,79 @@ public void testGroupByWithDimFilter() TestHelper.assertExpectedObjects(expectedResults, result.toList(), "dimFilter"); } + @Test + public void testGroupByWithDimFilterEmptyResults() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("xx") + .setQuerySegmentSpec(new LegacySegmentSpec("1970/3000")) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("tags", "tags")) + .setAggregatorSpecs(new CountAggregatorFactory("count")) + .setDimFilter(new InDimFilter("product", ImmutableList.of("product_5"), null)) + .setContext(context) + .build(); + + Sequence result = helper.runQueryOnSegmentsObjs( + ImmutableList.of( + new QueryableIndexSegment(queryableIndexNullSampler, SegmentId.dummy("sid1")), + new IncrementalIndexSegment(incrementalIndexNullSampler, SegmentId.dummy("sid2")) + ), + query + ); + + List expectedResults = Collections.singletonList( + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 2L) + ); + + TestHelper.assertExpectedObjects(expectedResults, result.toList(), "filter-empty"); + } + + @Test + public void testGroupByWithDimFilterNullishResults() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource("xx") + .setQuerySegmentSpec(new LegacySegmentSpec("1970/3000")) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("tags", "tags")) + .setAggregatorSpecs(new CountAggregatorFactory("count")) + .setDimFilter( + new InDimFilter("product", ImmutableList.of("product_5", "product_6", "product_8"), null) + ) + .setContext(context) + .build(); + + Sequence result = helper.runQueryOnSegmentsObjs( + ImmutableList.of( + new QueryableIndexSegment(queryableIndexNullSampler, SegmentId.dummy("sid1")), + new IncrementalIndexSegment(incrementalIndexNullSampler, SegmentId.dummy("sid2")) + ), + query + ); + + List expectedResults; + // an empty row e.g. [], or group by 'missing' value, is grouped with the default string value, "" or null + // grouping input is filtered to [], null, [""] + if (NullHandling.replaceWithDefault()) { + // when sql compatible null handling is disabled, the inputs are effectively [], null, [null] and + // are all grouped as null + expectedResults = Collections.singletonList( + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 6L) + ); + } else { + // with sql compatible null handling, null and [] = null, but [""] = "" + expectedResults = ImmutableList.of( + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 4L), + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "", "count", 2L) + ); + } + + TestHelper.assertExpectedObjects(expectedResults, result.toList(), "filter-nullish"); + } + @Test public void testGroupByWithDimFilterAndWithFilteredDimSpec() { @@ -232,6 +358,7 @@ public void testGroupByWithDimFilterAndWithFilteredDimSpec() .setDimensions(new RegexFilteredDimensionSpec(new DefaultDimensionSpec("tags", "tags"), "t3")) .setAggregatorSpecs(new CountAggregatorFactory("count")) .setDimFilter(new SelectorDimFilter("tags", "t3", null)) + .setContext(context) .build(); Sequence result = helper.runQueryOnSegmentsObjs( @@ -264,7 +391,8 @@ public void testTopNWithDimFilterAndWithFilteredDimSpec() .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) .aggregators(Collections.singletonList(new CountAggregatorFactory("count"))) .threshold(5) - .filters(new SelectorDimFilter("tags", "t3", null)).build(); + .filters(new SelectorDimFilter("tags", "t3", null)) + .build(); try (CloseableStupidPool pool = TestQueryRunners.createDefaultNonBlockingPool()) { QueryRunnerFactory factory = new TopNQueryRunnerFactory(