Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions core/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.math.expr;

import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.TypeStrategy;

import javax.annotation.Nullable;
import java.util.List;
import java.util.stream.Collectors;

public class BuiltInExprMacros
{
public static class ComplexDecodeBase64ExprMacro implements ExprMacroTable.ExprMacro
{
public static final String NAME = "complex_decode_base64";

@Override
public String name()
{
return NAME;
}

@Override
public Expr apply(List<Expr> args)
{
return new ComplexDecodeBase64Expression(args);
}

final class ComplexDecodeBase64Expression extends ExprMacroTable.BaseScalarMacroFunctionExpr
{
private final ExpressionType complexType;
private final TypeStrategy<?> typeStrategy;

public ComplexDecodeBase64Expression(List<Expr> args)
{
super(NAME, args);
validationHelperCheckArgumentCount(args, 2);
final Expr arg0 = args.get(0);

if (!arg0.isLiteral()) {
throw validationFailed(
"first argument must be constant STRING expression containing a valid complex type name but got '%s' instead",
arg0.stringify()
);
}
if (arg0.isNullLiteral()) {
throw validationFailed("first argument must be constant STRING expression containing a valid complex type name but got NULL instead");
}
final Object literal = arg0.getLiteralValue();
if (!(literal instanceof String)) {
throw validationFailed(
"first argument must be constant STRING expression containing a valid complex type name but got '%s' instead",
arg0.getLiteralValue()
);
}

this.complexType = ExpressionTypeFactory.getInstance().ofComplex((String) literal);
try {
this.typeStrategy = complexType.getStrategy();
}
catch (IllegalArgumentException illegal) {
throw validationFailed(
"first argument must be a valid COMPLEX type name, got unknown COMPLEX type [%s]",
complexType.asTypeString()
);
}
}

@Override
public ExprEval<?> eval(ObjectBinding bindings)
{
ExprEval<?> toDecode = args.get(1).eval(bindings);
if (toDecode.value() == null) {
return ExprEval.ofComplex(complexType, null);
}
final Object serializedValue = toDecode.value();
final byte[] base64;
if (serializedValue instanceof String) {
base64 = StringUtils.decodeBase64String(toDecode.asString());
} else if (serializedValue instanceof byte[]) {
base64 = (byte[]) serializedValue;
} else if (complexType.getComplexTypeName().equals(toDecode.type().getComplexTypeName())) {
// pass it through, it is already the right thing
return toDecode;
} else {
throw validationFailed(
"second argument must be a base64 encoded STRING value but got %s instead",
toDecode.type()
);
}

return ExprEval.ofComplex(complexType, typeStrategy.fromBytes(base64));
}

@Override
public Expr visit(Shuttle shuttle)
{
List<Expr> newArgs = args.stream().map(x -> x.visit(shuttle)).collect(Collectors.toList());
return shuttle.visit(new ComplexDecodeBase64Expression(newArgs));
}

@Nullable
@Override
public ExpressionType getOutputType(InputBindingInspector inspector)
{
return complexType;
}

@Override
public boolean isLiteral()
{
return args.get(1).isLiteral();
}

@Override
public boolean isNullLiteral()
{
return args.get(1).isNullLiteral();
}

@Nullable
@Override
public Object getLiteralValue()
{
return eval(InputBindings.nilBindings()).value();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.StringUtils;

Expand All @@ -42,18 +43,18 @@
*/
public class ExprMacroTable
{
private static final List<ExprMacro> BUILT_IN = ImmutableList.of(
new BuiltInExprMacros.ComplexDecodeBase64ExprMacro()
);
private static final ExprMacroTable NIL = new ExprMacroTable(Collections.emptyList());

private final Map<String, ExprMacro> macroMap;

public ExprMacroTable(final List<ExprMacro> macros)
{
this.macroMap = macros.stream().collect(
Collectors.toMap(
m -> StringUtils.toLowerCase(m.name()),
m -> m
)
);
this.macroMap = Maps.newHashMapWithExpectedSize(BUILT_IN.size() + macros.size());
macroMap.putAll(BUILT_IN.stream().collect(Collectors.toMap(m -> StringUtils.toLowerCase(m.name()), m -> m)));
macroMap.putAll(macros.stream().collect(Collectors.toMap(m -> StringUtils.toLowerCase(m.name()), m -> m)));
}

public static ExprMacroTable nil()
Expand Down
74 changes: 0 additions & 74 deletions core/src/main/java/org/apache/druid/math/expr/Function.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@
import org.apache.druid.math.expr.vector.VectorProcessors;
import org.apache.druid.math.expr.vector.VectorStringProcessors;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.TypeStrategy;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;

import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -3683,76 +3681,4 @@ protected HumanReadableBytes.UnitSystem getUnitSystem()
return HumanReadableBytes.UnitSystem.DECIMAL;
}
}

class ComplexDecodeBase64Function implements Function
{
@Override
public String name()
{
return "complex_decode_base64";
}

@Override
public ExprEval apply(List<Expr> args, Expr.ObjectBinding bindings)
{
ExprEval arg0 = args.get(0).eval(bindings);
if (!arg0.type().is(ExprType.STRING)) {
throw validationFailed(
"first argument must be constant STRING expression containing a valid complex type name but got %s instead",
arg0.type()
);
}
ExpressionType type = ExpressionTypeFactory.getInstance().ofComplex((String) args.get(0).getLiteralValue());
TypeStrategy strategy;
try {
strategy = type.getStrategy();
}
catch (IllegalArgumentException illegal) {
throw validationFailed(
"first argument must be a valid COMPLEX type name, got unknown COMPLEX type [%s]",
type.asTypeString()
);
}
ExprEval base64String = args.get(1).eval(bindings);
if (!base64String.type().is(ExprType.STRING)) {
throw validationFailed(
"second argument must be a base64 encoded STRING value but got %s instead",
base64String.type()
);
}
if (base64String.value() == null) {
return ExprEval.ofComplex(type, null);
}

final byte[] base64 = StringUtils.decodeBase64String(base64String.asString());
return ExprEval.ofComplex(type, strategy.read(ByteBuffer.wrap(base64)));
}

@Override
public void validateArguments(List<Expr> args)
{
validationHelperCheckArgumentCount(args, 2);
if (!args.get(0).isLiteral() || args.get(0).isNullLiteral()) {
throw validationFailed(
"first argument must be constant STRING expression containing a valid COMPLEX type name"
);
}
}

@Nullable
@Override
public ExpressionType getOutputType(
Expr.InputBindingInspector inspector,
List<Expr> args
)
{
ExpressionType arg0Type = args.get(0).getOutputType(inspector);
if (arg0Type == null || !arg0Type.is(ExprType.STRING)) {
throw validationFailed(
"first argument must be constant STRING expression containing a valid COMPLEX type name"
);
}
return ExpressionTypeFactory.getInstance().ofComplex((String) args.get(0).getLiteralValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public interface TypeStrategy<T> extends Comparator<T>
*/
int estimateSizeBytes(T value);


/**
* Read a non-null value from the {@link ByteBuffer} at the current {@link ByteBuffer#position()}. This will move
* the underlying position by the size of the value read.
Expand Down Expand Up @@ -150,4 +149,18 @@ default int write(ByteBuffer buffer, int offset, T value, int maxSizeBytes)
buffer.position(oldPosition);
}
}

/**
* Translate raw byte array into a value. This is primarily useful for transforming self contained values that are
* serialized into byte arrays, such as happens with 'COMPLEX' types which serialize to base64 strings in JSON
* responses.
*
* 'COMPLEX' types should implement this method to participate in the expression systems built-in function
* to deserialize base64 encoded values,
* {@link org.apache.druid.math.expr.BuiltInExprMacros.ComplexDecodeBase64ExprMacro}.
*/
default T fromBytes(byte[] value)
{
throw new IllegalStateException("Not supported");
}
}
15 changes: 14 additions & 1 deletion core/src/test/java/org/apache/druid/math/expr/FunctionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -924,12 +924,25 @@ public void testComplexDecodeBaseWrongArgCount()
);
}

@Test
public void testComplexDecodeBaseArg0Null()
{
expectedException.expect(ExpressionValidationException.class);
expectedException.expectMessage(
"Function[complex_decode_base64] first argument must be constant STRING expression containing a valid complex type name but got NULL instead"
);
assertExpr(
"complex_decode_base64(null, string)",
null
);
}

@Test
public void testComplexDecodeBaseArg0BadType()
{
expectedException.expect(ExpressionValidationException.class);
expectedException.expectMessage(
"Function[complex_decode_base64] first argument must be constant STRING expression containing a valid complex type name but got LONG instead"
"Function[complex_decode_base64] first argument must be constant STRING expression containing a valid complex type name but got '1' instead"
);
assertExpr(
"complex_decode_base64(1, string)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,5 +681,11 @@ public int write(ByteBuffer buffer, NullableLongPair value, int maxSizeBytes)
}
return written;
}

@Override
public NullableLongPair fromBytes(byte[] value)
{
return read(ByteBuffer.wrap(value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,25 @@ public Aggregation toDruidAggregation(
dimensionSpec = new DefaultDimensionSpec(virtualColumnName, null, inputType);
}

aggregatorFactory = new HllSketchBuildAggregatorFactory(
aggregatorName,
dimensionSpec.getDimension(),
logK,
tgtHllType,
finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext),
ROUND
);
if (inputType.is(ValueType.COMPLEX)) {
aggregatorFactory = new HllSketchMergeAggregatorFactory(
aggregatorName,
dimensionSpec.getOutputName(),
logK,
tgtHllType,
finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext),
ROUND
);
} else {
aggregatorFactory = new HllSketchBuildAggregatorFactory(
aggregatorName,
dimensionSpec.getDimension(),
logK,
tgtHllType,
finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext),
ROUND
);
}
}

return toAggregation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public T read(ByteBuffer buffer)
{
final int complexLength = buffer.getInt();
ByteBuffer dupe = buffer.duplicate();
dupe.order(buffer.order());
dupe.limit(dupe.position() + complexLength);
return objectStrategy.fromByteBuffer(dupe, complexLength);
}
Expand Down Expand Up @@ -85,4 +86,10 @@ public int compare(T o1, T o2)
{
return objectStrategy.compare(o1, o2);
}

@Override
public T fromBytes(byte[] value)
{
return objectStrategy.fromByteBuffer(ByteBuffer.wrap(value), value.length);
}
}
Loading