Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,17 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
private final boolean shouldFinalize;
private final boolean round;

private final boolean processAsArray;

HllSketchAggregatorFactory(
final String name,
final String fieldName,
@Nullable final Integer lgK,
@Nullable final String tgtHllType,
@Nullable final StringEncoding stringEncoding,
final Boolean shouldFinalize,
final boolean round
final boolean round,
final boolean processAsArray
)
{
this.name = Objects.requireNonNull(name);
Expand All @@ -79,6 +82,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
this.stringEncoding = stringEncoding == null ? DEFAULT_STRING_ENCODING : stringEncoding;
this.shouldFinalize = shouldFinalize == null ? DEFAULT_SHOULD_FINALIZE : shouldFinalize;
this.round = round;
this.processAsArray = processAsArray;
}

@Override
Expand Down Expand Up @@ -127,6 +131,13 @@ public boolean isRound()
return round;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isProcessAsArray()
{
return processAsArray;
}

@Override
public List<String> requiredFields()
{
Expand All @@ -149,7 +160,8 @@ public List<AggregatorFactory> getRequiredColumns()
tgtHllType.toString(),
stringEncoding,
shouldFinalize,
round
round,
false
)
);
}
Expand Down Expand Up @@ -284,13 +296,14 @@ public boolean equals(Object o)
&& Objects.equals(name, that.name)
&& Objects.equals(fieldName, that.fieldName)
&& tgtHllType == that.tgtHllType
&& stringEncoding == that.stringEncoding;
&& stringEncoding == that.stringEncoding
&& processAsArray == that.processAsArray;
}

@Override
public int hashCode()
{
return Objects.hash(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round);
return Objects.hash(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round, processAsArray);
}

@Override
Expand All @@ -304,6 +317,7 @@ public String toString()
(stringEncoding != DEFAULT_STRING_ENCODING ? ", stringEncoding=" + stringEncoding : "") +
(shouldFinalize != DEFAULT_SHOULD_FINALIZE ? ", shouldFinalize=" + shouldFinalize : "") +
(round != DEFAULT_ROUND ? ", round=" + round : "") +
(processAsArray ? ", processAsArray=true" : "") +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
{
public static final ColumnType TYPE = ColumnType.ofComplex(HllSketchModule.BUILD_TYPE_NAME);


@JsonCreator
public HllSketchBuildAggregatorFactory(
@JsonProperty("name") final String name,
Expand All @@ -62,10 +64,11 @@ public HllSketchBuildAggregatorFactory(
@JsonProperty("tgtHllType") @Nullable final String tgtHllType,
@JsonProperty("stringEncoding") @Nullable final StringEncoding stringEncoding,
@JsonProperty("shouldFinalize") final Boolean shouldFinalize,
@JsonProperty("round") final boolean round
@JsonProperty("round") final boolean round,
@JsonProperty("processAsArray") final boolean processAsArray
)
{
super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round);
super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round, processAsArray);
}


Expand Down Expand Up @@ -144,7 +147,8 @@ public AggregatorFactory withName(String newName)
getTgtHllType(),
getStringEncoding(),
isShouldFinalize(),
isRound()
isRound(),
isProcessAsArray()
);
}

Expand Down Expand Up @@ -223,12 +227,20 @@ private HllSketchUpdater formulateSketchUpdater(ColumnSelectorFactory columnSele
};
break;
case ARRAY:
throw InvalidInput.exception("ARRAY types are not supported for hll sketch");
final ExpressionType expressionType = ExpressionType.fromColumnTypeStrict(capabilities);
updater = sketch -> {
final Object o = selector.getObject();
if (o != null) {
byte[] bytes = ExprEval.toBytes(expressionType, o);
sketch.get().update(bytes);
}
};
break;
default:
updater = sketch -> {
Object obj = selector.getObject();
if (obj != null) {
HllSketchBuildUtil.updateSketch(sketch.get(), getStringEncoding(), obj);
HllSketchBuildUtil.updateSketch(sketch.get(), getStringEncoding(), obj, isProcessAsArray());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.segment.DimensionDictionarySelector;

import javax.annotation.Nullable;
Expand All @@ -33,19 +34,34 @@

public class HllSketchBuildUtil
{
public static void updateSketch(final HllSketch sketch, final StringEncoding stringEncoding, final Object value)
public static void updateSketch(
final HllSketch sketch,
final StringEncoding stringEncoding,
final Object value,
final boolean processAsArray
)
{
if (value instanceof Integer || value instanceof Long) {
sketch.update(((Number) value).longValue());
} else if (value instanceof Float || value instanceof Double) {
sketch.update(((Number) value).doubleValue());
} else if (value instanceof String) {
updateSketchWithString(sketch, stringEncoding, (String) value);
} else if (value instanceof Object[] && processAsArray) {
byte[] arrayBytes = ExprEval.toBytesBestEffort(value);
sketch.update(arrayBytes);
} else if (value instanceof List) {
// noinspection rawtypes
for (Object entry : (List) value) {
if (entry != null) {
updateSketchWithString(sketch, stringEncoding, entry.toString());
if (processAsArray) {
final ExprEval eval = ExprEval.bestEffortArray((List) value);
final byte[] arrayBytes = ExprEval.toBytes(eval);
sketch.update(arrayBytes);
} else {
// Lists are treated as multi-value strings, which count each element as a separate distinct value
// noinspection rawtypes
for (Object entry : (List) value) {
if (entry != null) {
updateSketchWithString(sketch, stringEncoding, entry.toString());
}
}
}
} else if (value instanceof char[]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public HllSketchMergeAggregatorFactory(
@JsonProperty("round") final boolean round
)
{
super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round);
super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public Aggregation toDruidAggregation(
tgtHllType,
stringEncoding,
finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext),
ROUND
ROUND,
inputType.isArray()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.apache.druid.query.aggregation.datasketches.hll.vector;

import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper;
import org.apache.druid.segment.VectorColumnProcessorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
Expand All @@ -29,6 +32,9 @@
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

public class HllSketchBuildVectorProcessorFactory implements VectorColumnProcessorFactory<HllSketchBuildVectorProcessor>
{
private final HllSketchBuildBufferAggregatorHelper helper;
Expand Down Expand Up @@ -90,7 +96,40 @@ public HllSketchBuildVectorProcessor makeArrayProcessor(
VectorObjectSelector selector
)
{
throw DruidException.defensive("ARRAY types are not supported for hll sketch");
final ExpressionType expressionType = ExpressionType.fromColumnTypeStrict(capabilities);
return new HllSketchBuildVectorProcessor()
{
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
final Object[] vector = selector.getObjectVector();
final HllSketch sketch = helper.getSketchAtPosition(buf, position);

for (int i = startRow; i < endRow; i++) {
if (vector[i] != null) {
byte[] bytes = ExprEval.toBytes(expressionType, vector[i]);
sketch.update(bytes);
}
}
}

@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
final Object[] vector = selector.getObjectVector();

for (int i = 0; i < numRows; i++) {
final int idx = rows != null ? rows[i] : i;
final int position = positions[i] + positionOffset;
final HllSketch sketch = helper.getSketchAtPosition(buf, position);

if (vector[idx] != null) {
byte[] bytes = ExprEval.toBytes(expressionType, vector[idx]);
sketch.update(bytes);
}
}
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
HllSketchBuildUtil.updateSketch(
sketch,
stringEncoding,
vector[i]
vector[i],
false
);
}
}
Expand All @@ -79,7 +80,8 @@ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable in
HllSketchBuildUtil.updateSketch(
sketch,
stringEncoding,
vector[idx]
vector[idx],
false
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.datasketches.theta.Union;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;

Expand All @@ -35,14 +36,16 @@ public class SketchAggregator implements Aggregator

private final BaseObjectColumnValueSelector selector;
private final int size;
private final boolean processAsArray;

@Nullable
private Union union;

public SketchAggregator(BaseObjectColumnValueSelector selector, int size)
public SketchAggregator(BaseObjectColumnValueSelector selector, int size, boolean processAsArray)
{
this.selector = selector;
this.size = size;
this.processAsArray = processAsArray;
}

private void initUnion()
Expand All @@ -61,7 +64,7 @@ public void aggregate()
if (union == null) {
initUnion();
}
updateUnion(union, update);
updateUnion(union, update, processAsArray);
}
}

Expand All @@ -85,7 +88,7 @@ public long aggregateWithSize()
initialSketchSize = union.getCurrentBytes();
}

updateUnion(union, update);
updateUnion(union, update, processAsArray);

long sketchSizeDelta = union.getCurrentBytes() - initialSketchSize;
return sketchSizeDelta + unionSizeDelta;
Expand Down Expand Up @@ -132,7 +135,7 @@ public void close()
union = null;
}

static void updateUnion(Union union, Object update)
static void updateUnion(Union union, Object update, boolean processAsArrays)
{
if (update instanceof SketchHolder) {
((SketchHolder) update).updateUnion(union);
Expand All @@ -148,12 +151,21 @@ static void updateUnion(Union union, Object update)
union.update((int[]) update);
} else if (update instanceof long[]) {
union.update((long[]) update);
} else if (update instanceof Object[] && processAsArrays) {
final byte[] arrayBytes = ExprEval.toBytesBestEffort(update);
union.update(arrayBytes);
} else if (update instanceof List) {
for (Object entry : (List) update) {
if (entry != null) {
final String asString = entry.toString();
if (!NullHandling.isNullOrEquivalent(asString)) {
union.update(asString);
if (processAsArrays) {
final ExprEval eval = ExprEval.bestEffortArray((List) update);
final byte[] arrayBytes = ExprEval.toBytes(eval);
union.update(arrayBytes);
} else {
for (Object entry : (List) update) {
if (entry != null) {
final String asString = entry.toString();
if (!NullHandling.isNullOrEquivalent(asString)) {
union.update(asString);
}
}
}
}
Expand Down
Loading