Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.BitmapColumnIndex;
import org.apache.druid.segment.column.IndexedUtf8ValueSetIndex;
import org.apache.druid.segment.column.StringValueSetIndex;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.BitmapColumnIndex;
import org.apache.druid.segment.index.IndexedUtf8ValueIndexes;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.serde.StringUtf8ColumnIndexSupplier;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -72,7 +72,7 @@ public class DictionaryEncodedStringIndexSupplierBenchmark
public static class BenchmarkState
{
@Nullable
private IndexedUtf8ValueSetIndex<?> stringValueSetIndex;
private IndexedUtf8ValueIndexes<?> stringValueSetIndex;
private final TreeSet<ByteBuffer> values = new TreeSet<>();
private static final int START_INT = 10_000_000;

Expand Down Expand Up @@ -112,7 +112,7 @@ public void setup()
);
StringUtf8ColumnIndexSupplier<?> indexSupplier =
new StringUtf8ColumnIndexSupplier<>(bitmapFactory, dictionaryUtf8::singleThreaded, bitmaps, null);
stringValueSetIndex = (IndexedUtf8ValueSetIndex<?>) indexSupplier.as(StringValueSetIndex.class);
stringValueSetIndex = (IndexedUtf8ValueIndexes<?>) indexSupplier.as(StringValueSetIndexes.class);
List<Integer> filterValues = new ArrayList<>();
List<Integer> nonFilterValues = new ArrayList<>();
for (int i = 0; i < dictionarySize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
Expand Down Expand Up @@ -444,7 +443,7 @@ public void testEmptyTimeseriesResults()
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.filters(bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC))
.filters(numericEquality("dim2", 0L, ColumnType.LONG))
.granularity(Granularities.ALL)
.aggregators(ImmutableList.of(
new TDigestSketchAggregatorFactory("a0:agg", "m1", TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION),
Expand Down Expand Up @@ -476,19 +475,19 @@ public void testGroupByAggregatorDefaultValues()
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setDimFilter(equality("dim2", "a", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new TDigestSketchAggregatorFactory("a0:agg", "m1", TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
),
new FilteredAggregatorFactory(
new TDigestSketchAggregatorFactory("a1:agg", "qsketch_m1", 100),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.query.aggregation.Aggregator;
Expand Down Expand Up @@ -221,6 +222,8 @@ private HllSketchUpdater formulateSketchUpdater(ColumnSelectorFactory columnSele
}
};
break;
case ARRAY:
throw InvalidInput.exception("ARRAY types are not supported for hll sketch");
default:
updater = sketch -> {
Object obj = selector.getObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.query.aggregation.datasketches.hll.vector;

import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper;
import org.apache.druid.segment.VectorColumnProcessorFactory;
Expand Down Expand Up @@ -83,6 +84,15 @@ public HllSketchBuildVectorProcessor makeLongProcessor(ColumnCapabilities capabi
return new LongHllSketchBuildVectorProcessor(helper, selector);
}

@Override
public HllSketchBuildVectorProcessor makeArrayProcessor(
ColumnCapabilities capabilities,
VectorObjectSelector selector
)
{
throw DruidException.defensive("ARRAY types are not supported for hll sketch");
}

@Override
public HllSketchBuildVectorProcessor makeObjectProcessor(
ColumnCapabilities capabilities,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public VectorAggregator makeSingleValueDimensionProcessor(
SingleValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllDoublesSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -188,7 +188,7 @@ public VectorAggregator makeMultiValueDimensionProcessor(
MultiValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllDoublesSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -209,6 +209,12 @@ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, Vecto
return new KllDoublesSketchBuildVectorAggregator(selector, getK(), getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public VectorAggregator makeSingleValueDimensionProcessor(
SingleValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllFloatsSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -188,7 +188,7 @@ public VectorAggregator makeMultiValueDimensionProcessor(
MultiValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllFloatsSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -209,6 +209,12 @@ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, Vecto
return new KllFloatsSketchBuildVectorAggregator(selector, getK(), getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, Vecto
return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new NoopDoublesSketchBufferAggregator();
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Union;
import org.apache.datasketches.thetacommon.ThetaUtil;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
Expand All @@ -39,6 +40,7 @@
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -78,13 +80,21 @@ public SketchAggregatorFactory(String name, String fieldName, Integer size, byte
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
ColumnCapabilities capabilities = metricFactory.getColumnCapabilities(fieldName);
if (capabilities != null && capabilities.isArray()) {
throw InvalidInput.exception("ARRAY types are not supported for theta sketch");
}
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
return new SketchAggregator(selector, size);
}

@Override
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
{
ColumnCapabilities capabilities = metricFactory.getColumnCapabilities(fieldName);
if (capabilities != null && capabilities.isArray()) {
throw InvalidInput.exception("ARRAY types are not supported for theta sketch");
}
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
final SketchAggregator aggregator = new SketchAggregator(selector, size);
return new AggregatorAndSize(aggregator, aggregator.getInitialSizeBytes());
Expand All @@ -94,6 +104,10 @@ public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
ColumnCapabilities capabilities = metricFactory.getColumnCapabilities(fieldName);
if (capabilities != null && capabilities.isArray()) {
throw InvalidInput.exception("ARRAY types are not supported for theta sketch");
}
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ public Supplier<Object[]> makeLongProcessor(ColumnCapabilities capabilities, Vec
};
}

@Override
public Supplier<Object[]> makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return selector::getObjectVector;
}

@Override
public Supplier<Object[]> makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
private static final List<AggregatorFactory> EXPECTED_FILTERED_AGGREGATORS =
EXPECTED_PA_AGGREGATORS.stream()
.limit(5)
.map(factory -> new FilteredAggregatorFactory(factory, selector("dim2", "a", null)))
.map(factory -> new FilteredAggregatorFactory(factory, equality("dim2", "a", ColumnType.STRING)))
.collect(Collectors.toList());

/**
Expand Down Expand Up @@ -344,7 +344,7 @@ public void testApproxCountDistinctHllSketch()
new HllSketchBuildAggregatorFactory("a1", "dim2", null, null, null, null, ROUND),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory("a2", "dim2", null, null, null, null, ROUND),
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null))
not(equality("dim2", "", ColumnType.STRING))
),
new HllSketchBuildAggregatorFactory("a3", "v0", null, null, null, null, ROUND),
new HllSketchBuildAggregatorFactory("a4", "v1", null, null, null, null, ROUND),
Expand Down Expand Up @@ -436,7 +436,7 @@ public void testAvgDailyCountDistinctHllSketch()
new LongSumAggregatorFactory("_a0:sum", "a0"),
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0:count"),
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("a0", null, null))
notNull("a0")
)
)
)
Expand Down Expand Up @@ -480,7 +480,7 @@ public void testApproxCountDistinctHllSketchIsRounded()
new HllSketchBuildAggregatorFactory("a0", "m1", null, null, null, true, true)
)
)
.setHavingSpec(having(selector("a0", "2", null)))
.setHavingSpec(having(equality("a0", 2L, ColumnType.LONG)))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
Expand Down Expand Up @@ -852,7 +852,7 @@ public void testEmptyTimeseriesResults()
ImmutableList.of(Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC))
.filters(numericEquality("dim2", 0L, ColumnType.LONG))
.granularity(Granularities.ALL)
.aggregators(
aggregators(
Expand Down Expand Up @@ -895,7 +895,7 @@ public void testGroupByAggregatorDefaultValues()
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setDimFilter(equality("dim2", "a", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "_d0", ColumnType.STRING))
Expand All @@ -911,7 +911,7 @@ public void testGroupByAggregatorDefaultValues()
null,
true
),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory(
Expand All @@ -923,7 +923,7 @@ public void testGroupByAggregatorDefaultValues()
false,
true
),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
)
)
)
Expand Down Expand Up @@ -954,19 +954,19 @@ public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches()
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setDimFilter(equality("dim2", "a", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "_d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory("a0", "v0", null, null, null, null, true),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory("a1", "v0", null, null, null, null, true),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
)
)
)
Expand Down
Loading