Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,55 +233,52 @@ public CloseableIterator<Entry<Integer>> iterator(boolean sorted)

return new CloseableIterator<Entry<Integer>>()
{
int cur;
boolean findNext = false;

{
cur = findNext();
}
// initialize to the first used slot
private int next = findNext(-1);

@Override
public boolean hasNext()
{
if (findNext) {
cur = findNext();
findNext = false;
}
return cur >= 0;
}

private int findNext()
{
for (int i = cur + 1; i < cardinalityWithMissingValue; i++) {
if (isUsedSlot(i)) {
return i;
}
}
return -1;
return next >= 0;
}

@Override
public Entry<Integer> next()
{
if (cur < 0) {
if (next < 0) {
throw new NoSuchElementException();
}

findNext = true;
final int current = next;
next = findNext(current);

final Object[] values = new Object[aggregators.length];
final int recordOffset = cur * recordSize;
final int recordOffset = current * recordSize;
for (int i = 0; i < aggregators.length; i++) {
values[i] = aggregators[i].get(valBuffer, recordOffset + aggregatorOffsets[i]);
}
return new Entry<>(cur - 1, values);
// shift by -1 since values are initially shifted by +1 so they are all positive and
// GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE is -1
return new Entry<>(current - 1, values);
}

@Override
public void close()
{
// do nothing
}

private int findNext(int current)
{
// shift by +1 since we're looking for the next used slot after the current position
for (int i = current + 1; i < cardinalityWithMissingValue; i++) {
if (isUsedSlot(i)) {
return i;
}
}
// no more slots
return -1;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public boolean hasNext()
delegate.close();
}
delegate = initNewDelegate();
return true;
return delegate.hasNext();
} else {
return false;
}
Expand Down Expand Up @@ -681,7 +681,7 @@ protected void putToMap(Integer key, Map<String, Object> map)
((DimensionSelector) dim.getSelector()).lookupName(key)
);
} else {
map.put(dim.getOutputName(), "");
map.put(dim.getOutputName(), NullHandling.defaultStringValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
Expand All @@ -38,11 +40,13 @@
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ListFilteredDimensionSpec;
import org.apache.druid.query.dimension.RegexFilteredDimensionSpec;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
Expand Down Expand Up @@ -82,13 +86,15 @@
@RunWith(Parameterized.class)
public class MultiValuedDimensionTest
{
@Parameterized.Parameters(name = "{0}")
@Parameterized.Parameters(name = "groupby: {0} forceHashAggregation: {2} ({1})")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance()});
constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance()});
constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance(), false});
constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance(), false});
constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance(), true});
constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance(), true});
}
return constructors;
}
Expand All @@ -101,14 +107,24 @@ public static Collection<?> constructorFeeder()

private File persistedSegmentDir;

public MultiValuedDimensionTest(final GroupByQueryConfig config, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
private IncrementalIndex incrementalIndexNullSampler;
private QueryableIndex queryableIndexNullSampler;
private File persistedSegmentDirNullSampler;

private final ImmutableMap<String, Object> context;

public MultiValuedDimensionTest(final GroupByQueryConfig config, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, boolean forceHashAggregation)
{
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
ImmutableList.of(),
config,
null
);
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;

this.context = config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)
? ImmutableMap.of()
: ImmutableMap.of("forceHashAggregation", forceHashAggregation);
}

@Before
Expand Down Expand Up @@ -147,6 +163,41 @@ public void setup() throws Exception
.persist(incrementalIndex, persistedSegmentDir, new IndexSpec(), null);

queryableIndex = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir);



StringInputRowParser parserNullSampler = new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags", "othertags")), null, null)
),
"UTF-8"
);

incrementalIndexNullSampler = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(5000)
.buildOnheap();

String[] rowsNullSampler = new String[]{
"{\"time\":\"2011-01-13T00:00:00.000Z\",\"product\":\"product_1\",\"tags\":[],\"othertags\":[\"u1\", \"u2\"]}",
"{\"time\":\"2011-01-12T00:00:00.000Z\",\"product\":\"product_2\",\"othertags\":[\"u3\", \"u4\"]}",
"{\"time\":\"2011-01-14T00:00:00.000Z\",\"product\":\"product_3\",\"tags\":[\"\"],\"othertags\":[\"u1\", \"u5\"]}",
"{\"time\":\"2011-01-15T00:00:00.000Z\",\"product\":\"product_4\",\"tags\":[\"t1\", \"t2\", \"\"],\"othertags\":[\"u6\", \"u7\"]}",
"{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_5\",\"tags\":[],\"othertags\":[]}",
"{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_6\"}",
"{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_7\",\"othertags\":[]}",
"{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_8\",\"tags\":[\"\"],\"othertags\":[]}"
};

for (String row : rowsNullSampler) {
incrementalIndexNullSampler.add(parserNullSampler.parse(row));
}
persistedSegmentDirNullSampler = Files.createTempDir();
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory)
.persist(incrementalIndexNullSampler, persistedSegmentDirNullSampler, new IndexSpec(), null);

queryableIndexNullSampler = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDirNullSampler);
}

@After
Expand All @@ -165,6 +216,7 @@ public void testGroupByNoFilter()
.setGranularity(Granularities.ALL)
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
.setAggregatorSpecs(new CountAggregatorFactory("count"))
.setContext(context)
.build();

Sequence<Row> result = helper.runQueryOnSegmentsObjs(
Expand Down Expand Up @@ -200,6 +252,7 @@ public void testGroupByWithDimFilter()
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
.setAggregatorSpecs(new CountAggregatorFactory("count"))
.setDimFilter(new SelectorDimFilter("tags", "t3", null))
.setContext(context)
.build();

Sequence<Row> result = helper.runQueryOnSegmentsObjs(
Expand All @@ -221,6 +274,79 @@ public void testGroupByWithDimFilter()
TestHelper.assertExpectedObjects(expectedResults, result.toList(), "dimFilter");
}

@Test
public void testGroupByWithDimFilterEmptyResults()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("xx")
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
.setGranularity(Granularities.ALL)
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
.setAggregatorSpecs(new CountAggregatorFactory("count"))
.setDimFilter(new InDimFilter("product", ImmutableList.of("product_5"), null))
.setContext(context)
.build();

Sequence<Row> result = helper.runQueryOnSegmentsObjs(
ImmutableList.of(
new QueryableIndexSegment(queryableIndexNullSampler, SegmentId.dummy("sid1")),
new IncrementalIndexSegment(incrementalIndexNullSampler, SegmentId.dummy("sid2"))
),
query
);

List<Row> expectedResults = Collections.singletonList(
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 2L)
);

TestHelper.assertExpectedObjects(expectedResults, result.toList(), "filter-empty");
}

@Test
public void testGroupByWithDimFilterNullishResults()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource("xx")
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
.setGranularity(Granularities.ALL)
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
.setAggregatorSpecs(new CountAggregatorFactory("count"))
.setDimFilter(
new InDimFilter("product", ImmutableList.of("product_5", "product_6", "product_8"), null)
)
.setContext(context)
.build();

Sequence<Row> result = helper.runQueryOnSegmentsObjs(
ImmutableList.of(
new QueryableIndexSegment(queryableIndexNullSampler, SegmentId.dummy("sid1")),
new IncrementalIndexSegment(incrementalIndexNullSampler, SegmentId.dummy("sid2"))
),
query
);

List<Row> expectedResults;
// an empty row e.g. [], or group by 'missing' value, is grouped with the default string value, "" or null
// grouping input is filtered to [], null, [""]
if (NullHandling.replaceWithDefault()) {
// when sql compatible null handling is disabled, the inputs are effectively [], null, [null] and
// are all grouped as null
expectedResults = Collections.singletonList(
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 6L)
);
} else {
// with sql compatible null handling, null and [] = null, but [""] = ""
expectedResults = ImmutableList.of(
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 4L),
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "", "count", 2L)
);
}

TestHelper.assertExpectedObjects(expectedResults, result.toList(), "filter-nullish");
}

@Test
public void testGroupByWithDimFilterAndWithFilteredDimSpec()
{
Expand All @@ -232,6 +358,7 @@ public void testGroupByWithDimFilterAndWithFilteredDimSpec()
.setDimensions(new RegexFilteredDimensionSpec(new DefaultDimensionSpec("tags", "tags"), "t3"))
.setAggregatorSpecs(new CountAggregatorFactory("count"))
.setDimFilter(new SelectorDimFilter("tags", "t3", null))
.setContext(context)
.build();

Sequence<Row> result = helper.runQueryOnSegmentsObjs(
Expand Down Expand Up @@ -264,7 +391,8 @@ public void testTopNWithDimFilterAndWithFilteredDimSpec()
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.threshold(5)
.filters(new SelectorDimFilter("tags", "t3", null)).build();
.filters(new SelectorDimFilter("tags", "t3", null))
.build();

try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
Expand Down