Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class DataSketchesHllBenchmark
null,
null,
null,
null,
false
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctUtf8SqlAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchApproxQuantileSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchObjectSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchApproxCountDistinctSqlAggregator;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
Expand Down Expand Up @@ -405,26 +407,35 @@ public class SqlBenchmark
"SELECT * FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100')",
"SELECT * FROM foo WHERE dimSequential > '10' AND dimSequential < '8500'",
"SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100') GROUP BY 1, 2",
"SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential > '10' AND dimSequential < '8500' GROUP BY 1, 2"

"SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential > '10' AND dimSequential < '8500' GROUP BY 1, 2",

// 28, 29, 30, 31: Approximate count distinct of strings
"SELECT APPROX_COUNT_DISTINCT_BUILTIN(dimZipf) FROM foo",
"SELECT APPROX_COUNT_DISTINCT_DS_HLL(dimZipf) FROM foo",
"SELECT APPROX_COUNT_DISTINCT_DS_HLL_UTF8(dimZipf) FROM foo",
"SELECT APPROX_COUNT_DISTINCT_DS_THETA(dimZipf) FROM foo"
);

@Param({"5000000"})
private int rowsPerSegment;

@Param({"false", "force"})
// Can be "false", "true", or "force"
@Param({"force"})
private String vectorize;
@Param({"none", "front-coded-4", "front-coded-16"})

// Can be "none" or "front-coded-N"
@Param({"none", "front-coded-4"})
private String stringEncoding;

@Param({"4", "5", "6", "7", "8", "10", "11", "12", "19", "21", "22", "23", "26", "27"})
@Param({"28", "29", "30", "31"})
private String query;

@Param({STORAGE_MMAP, STORAGE_FRAME_ROW, STORAGE_FRAME_COLUMNAR})
// Can be STORAGE_MMAP, STORAGE_FRAME_ROW, or STORAGE_FRAME_COLUMNAR
@Param({STORAGE_MMAP})
private String storageType;

private SqlEngine engine;

@Nullable
private PlannerFactory plannerFactory;
private final Closer closer = Closer.create();
Expand Down Expand Up @@ -520,13 +531,19 @@ private static DruidOperatorTable createOperatorTable()
try {
final Set<SqlOperatorConversion> extractionOperators = new HashSet<>();
extractionOperators.add(CalciteTests.INJECTOR.getInstance(QueryLookupOperatorConversion.class));
final Set<SqlAggregator> aggregators = new HashSet<>();
aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchApproxQuantileSqlAggregator.class));
aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchObjectSqlAggregator.class));
final ApproxCountDistinctSqlAggregator countDistinctSqlAggregator =
new ApproxCountDistinctSqlAggregator(new HllSketchApproxCountDistinctSqlAggregator());
aggregators.add(new CountSqlAggregator(countDistinctSqlAggregator));
aggregators.add(countDistinctSqlAggregator);
final Set<SqlAggregator> aggregators = new HashSet<>(
ImmutableList.of(
new DoublesSketchApproxQuantileSqlAggregator(),
new DoublesSketchObjectSqlAggregator(),
new HllSketchApproxCountDistinctSqlAggregator(),
new HllSketchApproxCountDistinctUtf8SqlAggregator(),
new ThetaSketchApproxCountDistinctSqlAggregator(),
new CountSqlAggregator(countDistinctSqlAggregator),
countDistinctSqlAggregator
)
);
return new DruidOperatorTable(aggregators, extractionOperators);
}
catch (Exception e) {
Expand Down
1 change: 1 addition & 0 deletions codestyle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
<Or>
<Class name="org.apache.druid.jackson.DefaultTrueJsonIncludeFilter"/>
<Class name="org.apache.druid.java.util.common.StringEncodingDefaultUTF16LEJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanRowsLimitJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanTimeOrderJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$BatchSizeJsonIncludeFilter"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.druid.jackson.DefaultTrueJsonIncludeFilter;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.java.util.common.StringEncodingDefaultUTF16LEJsonIncludeFilter;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
Expand All @@ -47,6 +49,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
public static final boolean DEFAULT_SHOULD_FINALIZE = true;
public static final int DEFAULT_LG_K = 12;
public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4;
public static final StringEncoding DEFAULT_STRING_ENCODING = StringEncoding.UTF16LE;

static final Comparator<HllSketchHolder> COMPARATOR =
Comparator.nullsFirst(Comparator.comparingDouble(HllSketchHolder::getEstimate));
Expand All @@ -55,6 +58,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
private final String fieldName;
private final int lgK;
private final TgtHllType tgtHllType;
private final StringEncoding stringEncoding;
private final boolean shouldFinalize;
private final boolean round;

Expand All @@ -63,6 +67,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
final String fieldName,
@Nullable final Integer lgK,
@Nullable final String tgtHllType,
@Nullable final StringEncoding stringEncoding,
final Boolean shouldFinalize,
final boolean round
)
Expand All @@ -71,6 +76,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
this.fieldName = Objects.requireNonNull(fieldName);
this.lgK = lgK == null ? DEFAULT_LG_K : lgK;
this.tgtHllType = tgtHllType == null ? DEFAULT_TGT_HLL_TYPE : TgtHllType.valueOf(tgtHllType);
this.stringEncoding = stringEncoding == null ? DEFAULT_STRING_ENCODING : stringEncoding;
this.shouldFinalize = shouldFinalize == null ? DEFAULT_SHOULD_FINALIZE : shouldFinalize;
this.round = round;
}
Expand Down Expand Up @@ -100,6 +106,13 @@ public String getTgtHllType()
return tgtHllType.toString();
}

@JsonProperty
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = StringEncodingDefaultUTF16LEJsonIncludeFilter.class)
public StringEncoding getStringEncoding()
{
return stringEncoding;
}

@JsonProperty
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = DefaultTrueJsonIncludeFilter.class)
public boolean isShouldFinalize()
Expand All @@ -121,14 +134,23 @@ public List<String> requiredFields()
}

/**
* This is a convoluted way to return a list of input field names this aggregator needs.
* Currently the returned factories are only used to obtain a field name by calling getName() method.
* Used by groupBy v1 to create a "transfer aggregator".
*
* {@inheritDoc}
*/
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(
new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), shouldFinalize, round)
new HllSketchBuildAggregatorFactory(
fieldName,
fieldName,
lgK,
tgtHllType.toString(),
stringEncoding,
shouldFinalize,
round
)
);
}

Expand Down Expand Up @@ -228,6 +250,7 @@ public AggregatorFactory getCombiningFactory()
getName(),
getLgK(),
getTgtHllType(),
getStringEncoding(),
isShouldFinalize(),
isRound()
);
Expand All @@ -236,8 +259,13 @@ public AggregatorFactory getCombiningFactory()
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName)
.appendInt(lgK).appendInt(tgtHllType.ordinal()).build();
return new CacheKeyBuilder(getCacheTypeId())
.appendString(name)
.appendString(fieldName)
.appendInt(lgK)
.appendInt(tgtHllType.ordinal())
.appendCacheable(stringEncoding)
.build();
}

@Override
Expand All @@ -255,13 +283,14 @@ public boolean equals(Object o)
&& round == that.round
&& Objects.equals(name, that.name)
&& Objects.equals(fieldName, that.fieldName)
&& tgtHllType == that.tgtHllType;
&& tgtHllType == that.tgtHllType
&& stringEncoding == that.stringEncoding;
}

@Override
public int hashCode()
{
return Objects.hash(name, fieldName, lgK, tgtHllType, shouldFinalize, round);
return Objects.hash(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round);
}

@Override
Expand All @@ -272,6 +301,7 @@ public String toString()
", fieldName='" + fieldName + '\'' +
", lgK=" + lgK +
", tgtHllType=" + tgtHllType +
(stringEncoding != DEFAULT_STRING_ENCODING ? ", stringEncoding=" + stringEncoding : "") +
(shouldFinalize != DEFAULT_SHOULD_FINALIZE ? ", shouldFinalize=" + shouldFinalize : "") +
(round != DEFAULT_ROUND ? ", round=" + round : "") +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,27 @@

import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.ColumnValueSelector;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* This aggregator builds sketches from raw data.
* The input column can contain identifiers of type string, char[], byte[] or any numeric type.
*/
public class HllSketchBuildAggregator implements Aggregator
{

private final ColumnValueSelector<Object> selector;
private final Consumer<Supplier<HllSketch>> processor;
private HllSketch sketch;

public HllSketchBuildAggregator(
final ColumnValueSelector<Object> selector,
final Consumer<Supplier<HllSketch>> processor,
final int lgK,
final TgtHllType tgtHllType
)
{
this.selector = selector;
this.processor = processor;
this.sketch = new HllSketch(lgK, tgtHllType);
}

Expand All @@ -54,15 +51,9 @@ public HllSketchBuildAggregator(
* See https://github.com/druid-io/druid/pull/3956
*/
@Override
public void aggregate()
public synchronized void aggregate()
{
final Object value = selector.getObject();
if (value == null) {
return;
}
synchronized (this) {
updateSketch(sketch, value);
}
processor.accept(() -> sketch);
}

/*
Expand Down Expand Up @@ -93,36 +84,4 @@ public long getLong()
{
throw new UnsupportedOperationException("Not implemented");
}

static void updateSketch(final HllSketch sketch, final Object value)
{
if (value instanceof Integer || value instanceof Long) {
sketch.update(((Number) value).longValue());
} else if (value instanceof Float || value instanceof Double) {
sketch.update(((Number) value).doubleValue());
} else if (value instanceof String) {
sketch.update(((String) value).toCharArray());
} else if (value instanceof List) {
// noinspection rawtypes
for (Object entry : (List) value) {
if (entry != null) {
final String asString = entry.toString();
if (!NullHandling.isNullOrEquivalent(asString)) {
sketch.update(asString);
}
}
}
} else if (value instanceof char[]) {
sketch.update((char[]) value);
} else if (value instanceof byte[]) {
sketch.update((byte[]) value);
} else if (value instanceof int[]) {
sketch.update((int[]) value);
} else if (value instanceof long[]) {
sketch.update((long[]) value);
} else {
throw new IAE("Unsupported type " + value.getClass());
}
}

}
Loading