Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,23 @@
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.util.SqlVisitor;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.util.Optionality;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
Expand Down Expand Up @@ -63,15 +71,22 @@
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class EarliestLatestAnySqlAggregator implements SqlAggregator
{
public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(AggregatorType.EARLIEST);
public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(AggregatorType.LATEST);
public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE);
public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(
AggregatorType.EARLIEST,
EarliestLatestBySqlAggregator.EARLIEST_BY.calciteFunction()
);
public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(
AggregatorType.LATEST,
EarliestLatestBySqlAggregator.LATEST_BY.calciteFunction()
);
public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE, null);

enum AggregatorType
{
Expand Down Expand Up @@ -164,10 +179,10 @@ abstract AggregatorFactory createAggregatorFactory(
private final AggregatorType aggregatorType;
private final SqlAggFunction function;

private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType)
private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType, final SqlAggFunction replacementAggFunc)
{
this.aggregatorType = aggregatorType;
this.function = new EarliestLatestSqlAggFunction(aggregatorType);
this.function = new EarliestLatestSqlAggFunction(aggregatorType, replacementAggFunc);
}

@Override
Expand Down Expand Up @@ -313,12 +328,48 @@ public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
}
}

private static class TimeColIdentifer extends SqlIdentifier
{

public TimeColIdentifer()
{
super("__time", SqlParserPos.ZERO);
}

@Override
public <R> R accept(SqlVisitor<R> visitor)
Comment thread
rohangarg marked this conversation as resolved.
{

try {
return super.accept(visitor);
}
catch (CalciteContextException e) {
if (e.getCause() instanceof SqlValidatorException) {
throw DruidException.forPersona(DruidException.Persona.ADMIN)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw DruidException.forPersona(DruidException.Persona.ADMIN)
throw DruidException.forPersona(DruidException.Persona.USER)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chose ADMIN Persona here since the exceptions in existing tests such as testTimeColumnAggregationsOnLookups had had the same -- not sure why though. If that's incorrect, shall update the tests as well.

Copy link
Copy Markdown
Contributor

@LakshSingla LakshSingla Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the reason you are looking for: https://github.com/apache/druid/blob/master/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java#L692

However, in this case, it seems like we do know the exact reason why the query failed - It's due to the absence of a time column. So perhaps change the wording to be more assertive. Also, we should change it to user, because of the reason highlighted in the link above.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the reason will always be absence of time column - what if there's a case of a join where __time column is coming from both the tables, in that case I think it could also be an ambiguous column.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what will admin do with these planning errors even if they are hints. The hints are meant for the end-user. I don't want to hold this PR so would let it merge as it is.

.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
e,
"Query could not be planned. A possible reason is [%s]",
Comment thread
abhishekagarwal87 marked this conversation as resolved.
"LATEST and EARLIEST aggregators implicitly depend on the __time column, but the "
+ "table queried doesn't contain a __time column. Please use LATEST_BY or EARLIEST_BY "
+ "and specify the column explicitly."
);

} else {
throw e;
}
}
}
}

private static class EarliestLatestSqlAggFunction extends SqlAggFunction
{
private static final EarliestLatestReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE =
new EarliestLatestReturnTypeInference(0);

EarliestLatestSqlAggFunction(AggregatorType aggregatorType)
private final SqlAggFunction replacementAggFunc;

EarliestLatestSqlAggFunction(AggregatorType aggregatorType, SqlAggFunction replacementAggFunc)
{
super(
aggregatorType.name(),
Expand All @@ -339,6 +390,43 @@ private static class EarliestLatestSqlAggFunction extends SqlAggFunction
false,
Optionality.FORBIDDEN
);
this.replacementAggFunc = replacementAggFunc;
}

@Override
public SqlNode rewriteCall(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if after this patch constructors of classes like LongLastAggregatorFactory should still accept timeColumn as null or not

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I think it shouldn't. Maybe can take up those modifications in a separate PR.

SqlValidator validator,
SqlCall call
)
{
// Rewrite EARLIEST/LATEST to EARLIEST_BY/LATEST_BY to make
// reference to __time column explicit so that Calcite tracks it

if (replacementAggFunc == null) {
return call;
}

List<SqlNode> operands = call.getOperandList();

SqlParserPos pos = call.getParserPosition();

if (operands.isEmpty() || operands.size() > 2) {
throw InvalidSqlInput.exception(
"Function [%s] expects 1 or 2 arguments but found [%s]",
getName(),
operands.size()
);
}

List<SqlNode> newOperands = new ArrayList<>();
newOperands.add(operands.get(0));
newOperands.add(new TimeColIdentifer());

if (operands.size() == 2) {
newOperands.add(operands.get(1));
}
Comment thread
gargvishesh marked this conversation as resolved.

return replacementAggFunc.createCall(pos, newOperands);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,35 @@ public void testEarliestAggregators()
);
}

@Test
public void testLatestToLatestByConversion()
{
msqIncompatible();
testQuery(
"SELECT LATEST(dim1,10) FROM (SELECT DISTINCT __time, dim1 from foo)",
ImmutableList.of(
new GroupByQuery.Builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
new DefaultDimensionSpec("__time", "d0", ColumnType.LONG),
new DefaultDimensionSpec("dim1", "d1", ColumnType.STRING)
))
.setContext(QUERY_CONTEXT_DEFAULT)
.build())
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
new StringLastAggregatorFactory("a0", "d1", "d0", 10))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()),
ImmutableList.of(new Object[]{"abc"})
);
}

@Test
public void testLatestVectorAggregators()
{
Expand Down