Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docs/content/development/extensions-core/datasketches-hll.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ druid.extensions.loadList=["druid-datasketches"]
"name" : <output name>,
"fieldName" : <metric name>,
"lgK" : <size and accuracy parameter>,
"tgtHllType" : <target HLL type>
"tgtHllType" : <target HLL type>,
"round": <false | true>
}
```

Expand All @@ -51,7 +52,8 @@ druid.extensions.loadList=["druid-datasketches"]
"name" : <output name>,
"fieldName" : <metric name>,
"lgK" : <size and accuracy parameter>,
"tgtHllType" : <target HLL type>
"tgtHllType" : <target HLL type>,
"round": <false | true>
}
```

Expand All @@ -62,6 +64,7 @@ druid.extensions.loadList=["druid-datasketches"]
|fieldName|A String for the name of the input field.|yes|
|lgK|log2 of K that is the number of buckets in the sketch, parameter that controls the size and the accuracy. Must be a power of 2 from 4 to 21 inclusively.|no, defaults to 12|
|tgtHllType|The type of the target HLL sketch. Must be "HLL&lowbar;4", "HLL&lowbar;6" or "HLL&lowbar;8" |no, defaults to "HLL&lowbar;4"|
|round|Round off values to whole numbers. Only affects query-time behavior and is ignored at ingestion-time.|no, defaults to false|

### Post Aggregators

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
*/
public abstract class HllSketchAggregatorFactory extends AggregatorFactory
{

public static final int DEFAULT_LG_K = 12;
public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4;

Expand All @@ -51,18 +50,21 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
private final String fieldName;
private final int lgK;
private final TgtHllType tgtHllType;
private final boolean round;

HllSketchAggregatorFactory(
final String name,
final String fieldName,
@Nullable final Integer lgK,
@Nullable final String tgtHllType
@Nullable final String tgtHllType,
final boolean round
)
{
this.name = Objects.requireNonNull(name);
this.fieldName = Objects.requireNonNull(fieldName);
this.lgK = lgK == null ? DEFAULT_LG_K : lgK;
this.tgtHllType = tgtHllType == null ? DEFAULT_TGT_HLL_TYPE : TgtHllType.valueOf(tgtHllType);
this.round = round;
}

@Override
Expand Down Expand Up @@ -90,6 +92,12 @@ public String getTgtHllType()
return tgtHllType.toString();
}

@JsonProperty
public boolean isRound()
{
return round;
}

@Override
public List<String> requiredFields()
{
Expand All @@ -103,7 +111,9 @@ public List<String> requiredFields()
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString()));
return Collections.singletonList(
new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), round)
);
}

@Override
Expand Down Expand Up @@ -159,13 +169,19 @@ public Class<HllSketch> classOfObject()

@Nullable
@Override
public Double finalizeComputation(@Nullable final Object object)
public Object finalizeComputation(@Nullable final Object object)
{
if (object == null) {
return null;
}
final HllSketch sketch = (HllSketch) object;
return sketch.getEstimate();
final double estimate = sketch.getEstimate();

if (round) {
return Math.round(estimate);
} else {
return estimate;
}
}

@Override
Expand All @@ -177,14 +193,14 @@ public Comparator<HllSketch> getComparator()
@Override
public AggregatorFactory getCombiningFactory()
{
return new HllSketchMergeAggregatorFactory(getName(), getName(), getLgK(), getTgtHllType());
return new HllSketchMergeAggregatorFactory(getName(), getName(), getLgK(), getTgtHllType(), isRound());
}

@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName)
.appendInt(lgK).appendInt(tgtHllType.ordinal()).build();
.appendInt(lgK).appendInt(tgtHllType.ordinal()).build();
}

@Override
Expand All @@ -209,6 +225,9 @@ public boolean equals(final Object object)
if (!tgtHllType.equals(that.tgtHllType)) {
return false;
}
if (round != that.round) {
return false;
}
return true;
}

Expand All @@ -222,11 +241,12 @@ public int hashCode()
public String toString()
{
return getClass().getSimpleName() + " {"
+ "name=" + name
+ "fieldName=" + fieldName
+ "lgK=" + lgK
+ "tgtHllType=" + tgtHllType
+ "}";
+ " name=" + name
+ ", fieldName=" + fieldName
+ ", lgK=" + lgK
+ ", tgtHllType=" + tgtHllType
+ ", round=" + round
+ " }";
}

protected abstract byte getCacheTypeId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ public HllSketchBuildAggregatorFactory(
@JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("lgK") @Nullable final Integer lgK,
@JsonProperty("tgtHllType") @Nullable final String tgtHllType)
@JsonProperty("tgtHllType") @Nullable final String tgtHllType,
@JsonProperty("round") final boolean round
)
{
super(name, fieldName, lgK, tgtHllType);
super(name, fieldName, lgK, tgtHllType, round);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ public HllSketchMergeAggregatorFactory(
@JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("lgK") @Nullable final Integer lgK,
@JsonProperty("tgtHllType") @Nullable final String tgtHllType
@JsonProperty("tgtHllType") @Nullable final String tgtHllType,
@JsonProperty("round") final boolean round
)
{
super(name, fieldName, lgK, tgtHllType);
super(name, fieldName, lgK, tgtHllType, round);
}

@Override
Expand All @@ -59,10 +60,11 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre
HllSketchMergeAggregatorFactory castedOther = (HllSketchMergeAggregatorFactory) other;

return new HllSketchMergeAggregatorFactory(
getName(),
getName(),
Math.max(getLgK(), castedOther.getLgK()),
getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType()
getName(),
getName(),
Math.max(getLgK(), castedOther.getLgK()),
getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType(),
isRound() || castedOther.isRound()
);
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class HllSketchSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL";
private static final boolean ROUND = true;

@Override
public SqlAggFunction calciteFunction()
Expand Down Expand Up @@ -134,8 +135,15 @@ public Aggregation toDruidAggregation(
final AggregatorFactory aggregatorFactory;
final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;

if (columnArg.isDirectColumnAccess() && rowSignature.getColumnType(columnArg.getDirectColumn()) == ValueType.COMPLEX) {
aggregatorFactory = new HllSketchMergeAggregatorFactory(aggregatorName, columnArg.getDirectColumn(), logK, tgtHllType);
if (columnArg.isDirectColumnAccess()
&& rowSignature.getColumnType(columnArg.getDirectColumn()) == ValueType.COMPLEX) {
aggregatorFactory = new HllSketchMergeAggregatorFactory(
aggregatorName,
columnArg.getDirectColumn(),
logK,
tgtHllType,
ROUND
);
} else {
final SqlTypeName sqlTypeName = columnRexNode.getType().getSqlTypeName();
final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
Expand All @@ -161,7 +169,8 @@ public Aggregation toDruidAggregation(
aggregatorName,
dimensionSpec.getDimension(),
logK,
tgtHllType
tgtHllType,
ROUND
);
}

Expand Down
Loading