Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/main/java/org/apache/druid/math/expr/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.annotations.SubclassesMustOverrideEqualsAndHashCode;
import org.apache.druid.java.util.common.Cacheable;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.math.expr.vector.ExprVectorProcessor;
import org.apache.druid.query.cache.CacheKeyBuilder;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand All @@ -38,8 +40,9 @@
* immutable.
*/
@SubclassesMustOverrideEqualsAndHashCode
public interface Expr
public interface Expr extends Cacheable
{

String NULL_LITERAL = "null";
Joiner ARG_JOINER = Joiner.on(", ");

Expand Down Expand Up @@ -171,6 +174,12 @@ default <T> ExprVectorProcessor<T> buildVectorized(VectorInputBindingInspector i
throw Exprs.cannotVectorize(this);
}

@Override
default byte[] getCacheKey()
{
return new CacheKeyBuilder(Exprs.EXPR_CACHE_KEY).appendString(stringify()).build();
}

/**
* Mechanism to supply input types for the bindings which will back {@link IdentifierExpr}, to use in the aid of
* inferring the output type of an expression with {@link #getOutputType}. A null value means that either the binding
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/druid/math/expr/Exprs.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

public class Exprs
{
public static final byte EXPR_CACHE_KEY = 0x00;
public static final byte LOOKUP_EXPR_CACHE_KEY = 0x01;

public static UnsupportedOperationException cannotVectorize(Expr expr)
{
return new UOE("Unable to vectorize expression:[%s]", expr.stringify());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ private void assertExpr(final String expression, final Object expectedResult)

Assert.assertEquals(expr.stringify(), roundTrip.stringify());
Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
}

private void assertExpr(final String expression, final Object[] expectedResult)
Expand All @@ -196,6 +198,8 @@ private void assertExpr(final String expression, final Object[] expectedResult)

Assert.assertEquals(expr.stringify(), roundTrip.stringify());
Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
}

private void assertExpr(final String expression, final Double[] expectedResult)
Expand Down Expand Up @@ -224,5 +228,7 @@ private void assertExpr(final String expression, final Double[] expectedResult)

Assert.assertEquals(expr.stringify(), roundTrip.stringify());
Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -605,11 +605,14 @@ private void assertExpr(final String expression, @Nullable final Object expected
final Expr roundTrip = Parser.parse(exprNoFlatten.stringify(), ExprMacroTable.nil());
Assert.assertEquals(expr.stringify(), expectedResult, roundTrip.eval(bindings).value());


final Expr roundTripFlatten = Parser.parse(expr.stringify(), ExprMacroTable.nil());
Assert.assertEquals(expr.stringify(), expectedResult, roundTripFlatten.eval(bindings).value());

Assert.assertEquals(expr.stringify(), roundTrip.stringify());
Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
}

private void assertArrayExpr(final String expression, @Nullable final Object[] expectedResult)
Expand All @@ -626,5 +629,7 @@ private void assertArrayExpr(final String expression, @Nullable final Object[] e

Assert.assertEquals(expr.stringify(), roundTrip.stringify());
Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.apache.druid.query.aggregation;

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand All @@ -40,10 +42,12 @@
import org.apache.druid.segment.virtual.ExpressionVectorSelectors;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

@PublicApi
Expand Down Expand Up @@ -360,4 +364,25 @@ public static VectorValueSelector makeVectorValueSelector(
}
return columnSelectorFactory.makeValueSelector(fieldName);
}

public static Supplier<byte[]> getSimpleAggregatorCacheKeySupplier(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

byte aggregatorType,
String fieldName,
Supplier<Expr> fieldExpression
)
{
return Suppliers.memoize(() -> {
byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
byte[] expressionBytes = Optional.ofNullable(fieldExpression.get())
.map(Expr::getCacheKey)
.orElse(StringUtils.EMPTY_BYTES);

return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(aggregatorType)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;

/**
*/
public class DoubleMaxAggregatorFactory extends SimpleDoubleAggregatorFactory
{
private final Supplier<byte[]> cacheKey;

@JsonCreator
public DoubleMaxAggregatorFactory(
@JsonProperty("name") String name,
Expand All @@ -46,6 +47,11 @@ public DoubleMaxAggregatorFactory(
)
{
super(macroTable, name, fieldName, expression);
this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
AggregatorUtil.DOUBLE_MAX_CACHE_TYPE_ID,
fieldName,
fieldExpression
);
}

public DoubleMaxAggregatorFactory(String name, String fieldName)
Expand Down Expand Up @@ -114,15 +120,7 @@ public List<AggregatorFactory> getRequiredColumns()
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);

return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(AggregatorUtil.DOUBLE_MAX_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
return cacheKey.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;

/**
*/
public class DoubleMinAggregatorFactory extends SimpleDoubleAggregatorFactory
{
private final Supplier<byte[]> cacheKey;

@JsonCreator
public DoubleMinAggregatorFactory(
@JsonProperty("name") String name,
Expand All @@ -46,6 +47,11 @@ public DoubleMinAggregatorFactory(
)
{
super(macroTable, name, fieldName, expression);
this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
AggregatorUtil.DOUBLE_MIN_CACHE_TYPE_ID,
fieldName,
fieldExpression
);
}

public DoubleMinAggregatorFactory(String name, String fieldName)
Expand Down Expand Up @@ -114,15 +120,7 @@ public List<AggregatorFactory> getRequiredColumns()
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);

return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(AggregatorUtil.DOUBLE_MIN_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
return cacheKey.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;

/**
*/
public class DoubleSumAggregatorFactory extends SimpleDoubleAggregatorFactory
{
private final Supplier<byte[]> cacheKey;

@JsonCreator
public DoubleSumAggregatorFactory(
@JsonProperty("name") String name,
Expand All @@ -46,6 +47,11 @@ public DoubleSumAggregatorFactory(
)
{
super(macroTable, name, fieldName, expression);
this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
AggregatorUtil.DOUBLE_SUM_CACHE_TYPE_ID,
fieldName,
fieldExpression
);
}

public DoubleSumAggregatorFactory(String name, String fieldName)
Expand Down Expand Up @@ -114,15 +120,7 @@ public List<AggregatorFactory> getRequiredColumns()
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);

return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(AggregatorUtil.DOUBLE_SUM_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
return cacheKey.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ public byte[] getCacheKey()
.appendStrings(fields)
.appendString(initialValueExpressionString)
.appendString(initialCombineValueExpressionString)
.appendString(foldExpressionString)
.appendString(combineExpressionString)
.appendString(compareExpressionString)
.appendString(finalizeExpressionString)
.appendCacheable(foldExpression.get())
.appendCacheable(combineExpression.get())
.appendCacheable(combineExpression.get())
.appendCacheable(finalizeExpression.get())
.appendInt(maxSizeBytes.getBytesInInt())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;

/**
*/
public class FloatMaxAggregatorFactory extends SimpleFloatAggregatorFactory
{
private final Supplier<byte[]> cacheKey;

@JsonCreator
public FloatMaxAggregatorFactory(
@JsonProperty("name") String name,
Expand All @@ -46,6 +47,11 @@ public FloatMaxAggregatorFactory(
)
{
super(macroTable, name, fieldName, expression);
this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
AggregatorUtil.FLOAT_MAX_CACHE_TYPE_ID,
fieldName,
fieldExpression
);
}

public FloatMaxAggregatorFactory(String name, String fieldName)
Expand Down Expand Up @@ -114,15 +120,7 @@ public List<AggregatorFactory> getRequiredColumns()
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);

return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(AggregatorUtil.FLOAT_MAX_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
return cacheKey.get();
}

@Override
Expand Down
Loading