Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ Only the COUNT aggregation can accept DISTINCT.
|`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|`EARLIEST(expr)`|Returns the earliest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`LATEST(expr)`|Returns the latest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|

For advice on choosing approximate aggregation functions, check out our [approximate aggregations documentation](aggregations.html#approx).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
Expand Down Expand Up @@ -102,6 +103,11 @@ public StringFirstAggregatorFactory(
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");

if (maxStringBytes != null && maxStringBytes < 0) {
throw new IAE("maxStringBytes must be greater than 0");
}

this.name = name;
this.fieldName = fieldName;
this.maxStringBytes = maxStringBytes == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
Expand Down Expand Up @@ -58,6 +59,11 @@ public StringLastAggregatorFactory(
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");

if (maxStringBytes != null && maxStringBytes < 0) {
throw new IAE("maxStringBytes must be greater than 0");
}

this.name = name;
this.fieldName = fieldName;
this.maxStringBytes = maxStringBytes == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -40,18 +41,40 @@ private Aggregations()
// No instantiation.
}

/**
* Get Druid expressions that correspond to "simple" aggregator inputs. This is used by standard sum/min/max
* aggregators, which have the following properties:
*
* 1) They can take direct field accesses or expressions as inputs.
* 2) They cannot implicitly cast strings to numbers when using a direct field access.
*
* @param plannerContext SQL planner context
* @param rowSignature input row signature
* @param call aggregate call object
* @param project project that should be applied before aggregation; may be null
*
* @return list of expressions corresponding to aggregator arguments, or null if any cannot be translated
*/
@Nullable
public static List<DruidExpression> getArgumentsForSimpleAggregator(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final AggregateCall call,
final Project project
@Nullable final Project project
)
{
return call.getArgList().stream()
.map(i -> Expressions.fromFieldAccess(rowSignature, project, i))
.map(rexNode -> toDruidExpressionForSimpleAggregator(plannerContext, rowSignature, rexNode))
.collect(Collectors.toList());
final List<DruidExpression> args = call
.getArgList()
.stream()
.map(i -> Expressions.fromFieldAccess(rowSignature, project, i))
.map(rexNode -> toDruidExpressionForSimpleAggregator(plannerContext, rowSignature, rexNode))
.collect(Collectors.toList());

if (args.stream().noneMatch(Objects::isNull)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here explaining why we do this part? It seems not obvious to me

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added method-level javadocs that explain it:

   * @return list of expressions corresponding to aggregator arguments, or null if any cannot be translated

return args;
} else {
return null;
}
}

private static DruidExpression toDruidExpressionForSimpleAggregator(
Expand All @@ -68,7 +91,8 @@ private static DruidExpression toDruidExpressionForSimpleAggregator(
if (druidExpression.isSimpleExtraction() &&
(!druidExpression.isDirectColumnAccess()
|| rowSignature.getColumnType(druidExpression.getDirectColumn()) == ValueType.STRING)) {
// Aggregators are unable to implicitly cast strings to numbers. So remove the simple extraction in this case.
// Aggregators are unable to implicitly cast strings to numbers.
// So remove the simple extraction, which forces the expression to be used instead of the direct column access.
return druidExpression.map(simpleExtraction -> null, Function.identity());
} else {
return druidExpression;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.aggregation.builtin;

import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class EarliestLatestSqlAggregator implements SqlAggregator
{
public static final SqlAggregator EARLIEST = new EarliestLatestSqlAggregator(EarliestOrLatest.EARLIEST);
public static final SqlAggregator LATEST = new EarliestLatestSqlAggregator(EarliestOrLatest.LATEST);

enum EarliestOrLatest
{
EARLIEST {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: leave a comment here reminding people not to rename the enum since the name() is used in the AggFunction below

@Override
AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueType type, int maxStringBytes)
{
switch (type) {
case LONG:
return new LongFirstAggregatorFactory(name, fieldName);
case FLOAT:
return new FloatFirstAggregatorFactory(name, fieldName);
case DOUBLE:
return new DoubleFirstAggregatorFactory(name, fieldName);
case STRING:
return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to validate that maxStringBytes >= 0 in both the aggregator factories? I traced through the code and I think an exception will be thrown in the String*BufferAggregator#aggregate because there will be an out of bounds exception.

Also it's not clear to me what the expected result should be if maxStringBytes is 0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation sounds like a nice addition. I can add it after #8834 is merged. Right now, this patch conflicts with that one, and is blocked on it.

I think if maxStringBytes is 0 you'd expect to get an empty string. A little goofy but technically correct. The best kind of correct :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) I asked because I was wondering if we could short circuit that special case. You don't need to compare the timestamps in the aggregator - as long as a string exists for any row, we know the result will be an empty string.

This edge case probably never happens - so again, feel free to ignore

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed the change with the validation.

I think short circuiting the special case isn't super needed, because it's crazy, and who would do it? (Famous last words.)

default:
throw new ISE("Cannot build aggregatorFactory for type[%s]", type);
}
}
},

LATEST {
@Override
AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueType type, int maxStringBytes)
{
switch (type) {
case LONG:
return new LongLastAggregatorFactory(name, fieldName);
case FLOAT:
return new FloatLastAggregatorFactory(name, fieldName);
case DOUBLE:
return new DoubleLastAggregatorFactory(name, fieldName);
case STRING:
return new StringLastAggregatorFactory(name, fieldName, maxStringBytes);
default:
throw new ISE("Cannot build aggregatorFactory for type[%s]", type);
}
}
};

abstract AggregatorFactory createAggregatorFactory(
String name,
String fieldName,
ValueType outputType,
int maxStringBytes
);
}

private final EarliestOrLatest earliestOrLatest;
private final SqlAggFunction function;

private EarliestLatestSqlAggregator(final EarliestOrLatest earliestOrLatest)
{
this.earliestOrLatest = earliestOrLatest;
this.function = new EarliestLatestSqlAggFunction(earliestOrLatest);
}

@Override
public SqlAggFunction calciteFunction()
{
return function;
}

@Nullable
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final List<RexNode> rexNodes = aggregateCall
.getArgList()
.stream()
.map(i -> Expressions.fromFieldAccess(rowSignature, project, i))
.collect(Collectors.toList());

final List<DruidExpression> args = Expressions.toDruidExpressions(plannerContext, rowSignature, rexNodes);

if (args == null) {
return null;
}

final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
final String fieldName;

if (args.get(0).isDirectColumnAccess()) {
fieldName = args.get(0).getDirectColumn();
} else {
final SqlTypeName sqlTypeName = rexNodes.get(0).getType().getSqlTypeName();
final VirtualColumn virtualColumn =
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, args.get(0), sqlTypeName);
fieldName = virtualColumn.getOutputName();
}

// Second arg must be a literal, if it exists (the type signature below requires it).
final int maxBytes = rexNodes.size() > 1 ? RexLiteral.intValue(rexNodes.get(1)) : -1;

final ValueType outputType = Calcites.getValueTypeForSqlTypeName(aggregateCall.getType().getSqlTypeName());
if (outputType == null) {
throw new ISE(
"Cannot translate output sqlTypeName[%s] to Druid type for aggregator[%s]",
aggregateCall.getType().getSqlTypeName(),
aggregateCall.getName()
);
}

return Aggregation.create(
Stream.of(virtualColumnRegistry.getVirtualColumn(fieldName))
.filter(Objects::nonNull)
.collect(Collectors.toList()),
Collections.singletonList(
earliestOrLatest.createAggregatorFactory(
aggregatorName,
fieldName,
outputType,
maxBytes
)
),
finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, aggregatorName) : null
);
}

private static class EarliestLatestSqlAggFunction extends SqlAggFunction
{
EarliestLatestSqlAggFunction(EarliestOrLatest earliestOrLatest)
{
super(
earliestOrLatest.name(),
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.ARG0,
InferTypes.RETURN_TYPE,
OperandTypes.or(
OperandTypes.or(OperandTypes.NUMERIC, OperandTypes.BOOLEAN),
OperandTypes.sequence(
"'" + earliestOrLatest.name() + "(expr, maxBytesPerString)'\n",
OperandTypes.STRING,
OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
)
),
SqlFunctionCategory.STRING,
false,
false
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private Expressions()
*/
public static RexNode fromFieldAccess(
final RowSignature rowSignature,
final Project project,
@Nullable final Project project,
final int fieldNumber
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.sql.calcite.aggregation.builtin.ApproxCountDistinctSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.AvgSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.SumSqlAggregator;
Expand Down Expand Up @@ -117,6 +118,8 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new ApproxCountDistinctSqlAggregator())
.add(new AvgSqlAggregator())
.add(new CountSqlAggregator())
.add(EarliestLatestSqlAggregator.EARLIEST)
.add(EarliestLatestSqlAggregator.LATEST)
.add(new MinSqlAggregator())
.add(new MaxSqlAggregator())
.add(new SumSqlAggregator())
Expand Down
Loading