diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index e2754cacbeb1..c1611f52db86 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -105,7 +105,7 @@ public class MSQTaskQueryMaker implements QueryMaker } @Override - public QueryResponse runQuery(final DruidQuery druidQuery) + public QueryResponse runQuery(final DruidQuery druidQuery) { String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java index 172506080606..f0cd7318f644 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java @@ -147,8 +147,8 @@ public Response doPost( final String sqlQueryId = stmt.sqlQueryId(); try { final DirectStatement.ResultSet plan = stmt.plan(); - final QueryResponse response = plan.run(); - final Sequence sequence = response.getResults(); + final QueryResponse response = plan.run(); + final Sequence sequence = response.getResults(); final SqlRowTransformer rowTransformer = plan.createRowTransformer(); final boolean isTaskStruct = MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index a1934f070a3c..18ad9f050de2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -439,7 +439,7 @@ public void testIncorrectInsertQuery() .setExpectedValidationErrorMatcher(CoreMatchers.allOf( CoreMatchers.instanceOf(SqlPlanningException.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( - "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause")) + "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause")) )) .verifyPlanningErrors(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 131a9fa91b5b..ca7d5d320326 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -187,7 +187,7 @@ public void testReplaceIncorrectSyntax() CoreMatchers.allOf( CoreMatchers.instanceOf(SqlPlanningException.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( - "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.")) + "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.")) ) ) .verifyPlanningErrors(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 10c67e27a28b..12b353268adf 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -697,7 +697,7 @@ public void testSelectOnInformationSchemaSource() CoreMatchers.allOf( CoreMatchers.instanceOf(SqlPlanningException.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( - "Cannot query table [INFORMATION_SCHEMA.SCHEMATA] with SQL engine 'msq-task'.")) + "Cannot query table INFORMATION_SCHEMA.SCHEMATA with SQL engine 'msq-task'.")) ) ) .verifyPlanningErrors(); @@ -712,7 +712,7 @@ public void testSelectOnSysSource() CoreMatchers.allOf( CoreMatchers.instanceOf(SqlPlanningException.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( - "Cannot query table [sys.segments] with SQL engine 'msq-task'.")) + "Cannot query table sys.segments with SQL engine 'msq-task'.")) ) ) .verifyPlanningErrors(); @@ -727,7 +727,7 @@ public void testSelectOnSysSourceWithJoin() CoreMatchers.allOf( CoreMatchers.instanceOf(SqlPlanningException.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( - "Cannot query table [sys.segments] with SQL engine 'msq-task'.")) + "Cannot query table sys.segments with SQL engine 'msq-task'.")) ) ) .verifyPlanningErrors(); @@ -743,7 +743,7 @@ public void testSelectOnSysSourceContainingWith() CoreMatchers.allOf( CoreMatchers.instanceOf(SqlPlanningException.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( - "Cannot query table [sys.segments] with SQL engine 'msq-task'.")) + "Cannot query table sys.segments with SQL engine 'msq-task'.")) ) ) .verifyPlanningErrors(); diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java index b4d80d01d93c..40e5267b80b3 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java +++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java @@ -128,7 +128,6 @@ public QueryLifecycle( this.startNs = startNs; } - /** * For callers who have already authorized their query, and where simplicity is desired over flexibility. This method * does it all in one call. Logs and metrics are emitted when the Sequence is either fully iterated or throws an @@ -140,8 +139,7 @@ public QueryLifecycle( * * @return results */ - @SuppressWarnings("unchecked") - public QueryResponse runSimple( + public QueryResponse runSimple( final Query query, final AuthenticationResult authenticationResult, final Access authorizationResult @@ -151,7 +149,7 @@ public QueryResponse runSimple( final Sequence results; - final QueryResponse queryResponse; + final QueryResponse queryResponse; try { preAuthorized(authenticationResult, authorizationResult); if (!authorizationResult.isAllowed()) { @@ -172,7 +170,7 @@ public QueryResponse runSimple( * cannot be moved into execute(). We leave this as an exercise for the future, however as this oddity * was discovered while just trying to expose HTTP response headers */ - return new QueryResponse( + return new QueryResponse( Sequences.wrap( results, new SequenceWrapper() @@ -193,8 +191,7 @@ public void after(final boolean isDone, final Throwable thrown) * * @param baseQuery the query */ - @SuppressWarnings("unchecked") - public void initialize(final Query baseQuery) + public void initialize(final Query baseQuery) { transition(State.NEW, State.INITIALIZED); @@ -282,17 +279,18 @@ private Access doAuthorize(final AuthenticationResult authenticationResult, fina * * @return result sequence and response context */ - public QueryResponse execute() + public QueryResponse execute() { transition(State.AUTHORIZED, State.EXECUTING); final ResponseContext responseContext = DirectDruidClient.makeResponseContextForQuery(); - final Sequence res = QueryPlus.wrap(baseQuery) + @SuppressWarnings("unchecked") + final Sequence res = QueryPlus.wrap((Query) baseQuery) .withIdentity(authenticationResult.getIdentity()) .run(texasRanger, responseContext); - return new QueryResponse(res == null ? Sequences.empty() : res, responseContext); + return new QueryResponse(res == null ? Sequences.empty() : res, responseContext); } /** diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index ea225efbae17..1a72cfc3b8ec 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -200,7 +200,7 @@ public Response doPost( throw new ForbiddenException(authResult.toString()); } - final QueryResponse queryResponse = queryLifecycle.execute(); + final QueryResponse queryResponse = queryLifecycle.execute(); final Sequence results = queryResponse.getResults(); final ResponseContext responseContext = queryResponse.getResponseContext(); final String prevEtag = getPreviousEtag(req); @@ -477,8 +477,8 @@ String getResponseType() } ObjectWriter newOutputWriter( - @Nullable QueryToolChest toolChest, - @Nullable Query query, + @Nullable QueryToolChest> toolChest, + @Nullable Query query, boolean serializeDateTimeAsLong ) { diff --git a/server/src/main/java/org/apache/druid/server/QueryResponse.java b/server/src/main/java/org/apache/druid/server/QueryResponse.java index 69908aee945c..d590c5014ef5 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResponse.java +++ b/server/src/main/java/org/apache/druid/server/QueryResponse.java @@ -22,23 +22,23 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.context.ResponseContext; -public class QueryResponse +public class QueryResponse { - public static QueryResponse withEmptyContext(Sequence results) - { - return new QueryResponse(results, ResponseContext.createEmpty()); - } - - private final Sequence results; + private final Sequence results; private final ResponseContext responseContext; - public QueryResponse(final Sequence results, final ResponseContext responseContext) + public QueryResponse(final Sequence results, final ResponseContext responseContext) { this.results = results; this.responseContext = responseContext; } - public Sequence getResults() + public static QueryResponse withEmptyContext(Sequence results) + { + return new QueryResponse(results, ResponseContext.createEmpty()); + } + + public Sequence getResults() { return results; } diff --git a/sql/src/main/codegen/includes/insert.ftl b/sql/src/main/codegen/includes/insert.ftl index 07898aaadc96..c0e04bc77245 100644 --- a/sql/src/main/codegen/includes/insert.ftl +++ b/sql/src/main/codegen/includes/insert.ftl @@ -38,7 +38,7 @@ SqlNode DruidSqlInsertEof() : ] { if (clusteredBy != null && partitionedBy.lhs == null) { - throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"); + throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"); } } // EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times. diff --git a/sql/src/main/codegen/includes/replace.ftl b/sql/src/main/codegen/includes/replace.ftl index ed9eee46deb3..ed8dbb10eed2 100644 --- a/sql/src/main/codegen/includes/replace.ftl +++ b/sql/src/main/codegen/includes/replace.ftl @@ -58,7 +58,7 @@ SqlNode DruidSqlReplaceEof() : ] { if (clusteredBy != null && partitionedBy.lhs == null) { - throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"); + throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"); } } // EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times. diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java index e173213a0ccd..507216c2368b 100644 --- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java @@ -98,7 +98,7 @@ public boolean runnable() * Do the actual execute step which allows subclasses to wrap the sequence, * as is sometimes needed for testing. */ - public QueryResponse run() + public QueryResponse run() { try { // Check cancellation. Required for SqlResourceTest to work. @@ -176,7 +176,7 @@ public DirectStatement( * * @return sequence which delivers query results */ - public QueryResponse execute() + public QueryResponse execute() { return plan().run(); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java new file mode 100644 index 000000000000..26f019e0a153 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.parser; + +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.druid.java.util.common.granularity.Granularity; + +import javax.annotation.Nullable; + +/** + * Common base class to the two Druid "ingest" statements: INSERT and REPLACE. + * Allows Planner code to work with these two statements generically where they + * share common clauses. + */ +public abstract class DruidSqlIngest extends SqlInsert +{ + protected final Granularity partitionedBy; + + // Used in the unparse function to generate the original query since we convert the string to an enum + protected final String partitionedByStringForUnparse; + + @Nullable + protected final SqlNodeList clusteredBy; + + public DruidSqlIngest(SqlParserPos pos, + SqlNodeList keywords, + SqlNode targetTable, + SqlNode source, + SqlNodeList columnList, + @Nullable Granularity partitionedBy, + @Nullable String partitionedByStringForUnparse, + @Nullable SqlNodeList clusteredBy + ) + { + super(pos, keywords, targetTable, source, columnList); + + this.partitionedByStringForUnparse = partitionedByStringForUnparse; + this.partitionedBy = partitionedBy; + this.clusteredBy = clusteredBy; + } + + public Granularity getPartitionedBy() + { + return partitionedBy; + } + + @Nullable + public SqlNodeList getClusteredBy() + { + return clusteredBy; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java index dbaace1d93a5..c2eeb2ed1e4d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java @@ -19,7 +19,6 @@ package org.apache.druid.sql.calcite.parser; -import com.google.common.base.Preconditions; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -32,24 +31,16 @@ /** * Extends the 'insert' call to hold custom parameters specific to Druid i.e. PARTITIONED BY and CLUSTERED BY - * This class extends the {@link SqlInsert} so that this SqlNode can be used in + * This class extends the {@link DruidSqlIngest} so that this SqlNode can be used in * {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing */ -public class DruidSqlInsert extends SqlInsert +public class DruidSqlInsert extends DruidSqlIngest { public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity"; // This allows reusing super.unparse public static final SqlOperator OPERATOR = SqlInsert.OPERATOR; - private final Granularity partitionedBy; - - // Used in the unparse function to generate the original query since we convert the string to an enum - private final String partitionedByStringForUnparse; - - @Nullable - private final SqlNodeList clusteredBy; - /** * While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is * disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly @@ -61,35 +52,18 @@ public DruidSqlInsert( @Nullable Granularity partitionedBy, @Nullable String partitionedByStringForUnparse, @Nullable SqlNodeList clusteredBy - ) throws ParseException + ) { super( insertNode.getParserPosition(), (SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this insertNode.getTargetTable(), insertNode.getSource(), - insertNode.getTargetColumnList() + insertNode.getTargetColumnList(), + partitionedBy, + partitionedByStringForUnparse, + clusteredBy ); - if (partitionedBy == null) { - throw new ParseException("INSERT statements must specify PARTITIONED BY clause explicitly"); - } - this.partitionedBy = partitionedBy; - - Preconditions.checkNotNull(partitionedByStringForUnparse); - this.partitionedByStringForUnparse = partitionedByStringForUnparse; - - this.clusteredBy = clusteredBy; - } - - @Nullable - public SqlNodeList getClusteredBy() - { - return clusteredBy; - } - - public Granularity getPartitionedBy() - { - return partitionedBy; } @Nonnull diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java index fb41bf7656f2..d527a08b59ec 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java @@ -19,7 +19,6 @@ package org.apache.druid.sql.calcite.parser; -import com.google.common.base.Preconditions; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; @@ -38,22 +37,14 @@ * This class extends the {@link SqlInsert} so that this SqlNode can be used in * {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing */ -public class DruidSqlReplace extends SqlInsert +public class DruidSqlReplace extends DruidSqlIngest { public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks"; public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", SqlKind.OTHER); - private final Granularity partitionedBy; - - // Used in the unparse function to generate the original query since we convert the string to an enum - private final String partitionedByStringForUnparse; - private final SqlNode replaceTimeQuery; - @Nullable - private final SqlNodeList clusteredBy; - /** * While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is * disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly @@ -66,28 +57,20 @@ public DruidSqlReplace( @Nullable String partitionedByStringForUnparse, @Nullable SqlNodeList clusteredBy, @Nullable SqlNode replaceTimeQuery - ) throws ParseException + ) { super( insertNode.getParserPosition(), (SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this insertNode.getTargetTable(), insertNode.getSource(), - insertNode.getTargetColumnList() + insertNode.getTargetColumnList(), + partitionedBy, + partitionedByStringForUnparse, + clusteredBy ); - if (replaceTimeQuery == null) { - throw new ParseException("Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL."); - } - if (partitionedBy == null) { - throw new ParseException("REPLACE statements must specify PARTITIONED BY clause explicitly"); - } - this.partitionedBy = partitionedBy; - - this.partitionedByStringForUnparse = Preconditions.checkNotNull(partitionedByStringForUnparse); this.replaceTimeQuery = replaceTimeQuery; - - this.clusteredBy = clusteredBy; } public SqlNode getReplaceTimeQuery() @@ -95,17 +78,6 @@ public SqlNode getReplaceTimeQuery() return replaceTimeQuery; } - public Granularity getPartitionedBy() - { - return partitionedBy; - } - - @Nullable - public SqlNodeList getClusteredBy() - { - return clusteredBy; - } - @Nonnull @Override public SqlOperator getOperator() diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 75be75855c2c..68db56dd37c0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -19,90 +19,31 @@ package org.apache.druid.sql.calcite.planner; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import org.apache.calcite.DataContext; -import org.apache.calcite.interpreter.BindableConvention; -import org.apache.calcite.interpreter.BindableRel; -import org.apache.calcite.interpreter.Bindables; -import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelRoot; -import org.apache.calcite.rel.RelVisitor; -import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.logical.LogicalSort; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlExplain; -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.ValidationException; -import org.apache.calcite.util.Pair; -import org.apache.druid.common.utils.IdUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.query.Query; -import org.apache.druid.segment.DimensionHandlerUtils; -import org.apache.druid.server.QueryResponse; +import org.apache.druid.query.QueryContext; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; -import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; -import org.apache.druid.sql.calcite.rel.DruidConvention; -import org.apache.druid.sql.calcite.rel.DruidQuery; -import org.apache.druid.sql.calcite.rel.DruidRel; -import org.apache.druid.sql.calcite.rel.DruidUnionRel; -import org.apache.druid.sql.calcite.run.EngineFeature; -import org.apache.druid.sql.calcite.run.QueryMaker; import org.apache.druid.sql.calcite.run.SqlEngine; -import org.apache.druid.sql.calcite.table.DruidTable; -import org.apache.druid.utils.Throwables; import org.joda.time.DateTimeZone; -import javax.annotation.Nullable; import java.io.Closeable; -import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; import java.util.Set; import java.util.function.Function; -import java.util.regex.Pattern; -import java.util.stream.Collectors; /** * Druid SQL planner. Wraps the underlying Calcite planner with Druid-specific @@ -121,27 +62,13 @@ public enum State START, VALIDATED, PREPARED, PLANNED } - private static final EmittingLogger log = new EmittingLogger(DruidPlanner.class); - private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE); - @VisibleForTesting - public static final String UNNAMED_INGESTION_COLUMN_ERROR = - "Cannot ingest expressions that do not have an alias " - + "or columns with names like EXPR$[digit].\n" - + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as " - + "\"func(X) as myColumn\""; - private final FrameworkConfig frameworkConfig; private final CalcitePlanner planner; private final PlannerContext plannerContext; private final SqlEngine engine; private State state = State.START; - private ParsedNodes parsed; - private SqlNode validatedQueryNode; + private SqlStatementHandler handler; private boolean authorized; - private PrepareResult prepareResult; - private Set resourceActions; - private RelRoot rootQueryRel; - private RexBuilder rexBuilder; DruidPlanner( final FrameworkConfig frameworkConfig, @@ -170,80 +97,44 @@ public void validate() throws SqlParseException, ValidationException // Parse the query string. SqlNode root = planner.parse(plannerContext.getSql()); - parsed = ParsedNodes.create(root, plannerContext.getTimeZone()); - - if (parsed.isSelect() && !plannerContext.engineHasFeature(EngineFeature.CAN_SELECT)) { - throw new ValidationException(StringUtils.format("Cannot execute SELECT with SQL engine '%s'.", engine.name())); - } else if (parsed.isInsert() && !plannerContext.engineHasFeature(EngineFeature.CAN_INSERT)) { - throw new ValidationException(StringUtils.format("Cannot execute INSERT with SQL engine '%s'.", engine.name())); - } else if (parsed.isReplace() && !plannerContext.engineHasFeature(EngineFeature.CAN_REPLACE)) { - throw new ValidationException(StringUtils.format("Cannot execute REPLACE with SQL engine '%s'.", engine.name())); - } + handler = createHandler(root); try { - if (parsed.getIngestionGranularity() != null) { - plannerContext.getQueryContext().addSystemParam( - DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, - plannerContext.getJsonMapper().writeValueAsString(parsed.getIngestionGranularity()) - ); - } - } - catch (JsonProcessingException e) { - throw new ValidationException("Unable to serialize partition granularity."); - } - - if (parsed.getReplaceIntervals() != null) { - plannerContext.getQueryContext().addSystemParam( - DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS, - String.join(",", parsed.getReplaceIntervals()) - ); - } - - try { - // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any - // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral} - // replacement. - // - // Parameter replacement is done only if the client provides parameter values. - // If this is a PREPARE-only, then there will be no values even if the statement contains - // parameters. If this is a PLAN, then we'll catch later the case that the statement - // contains parameters, but no values were provided. - SqlNode queryNode = parsed.getQueryNode(); - if (!plannerContext.getParameters().isEmpty()) { - queryNode = queryNode.accept(new SqlParameterizerShuttle(plannerContext)); - } - validatedQueryNode = planner.validate(queryNode); + handler.validate(); + plannerContext.setResourceActions(handler.resourceActions()); } catch (RuntimeException e) { throw new ValidationException(e); } - final SqlValidator validator = planner.getValidator(); - SqlResourceCollectorShuttle resourceCollectorShuttle = new SqlResourceCollectorShuttle(validator, plannerContext); - validatedQueryNode.accept(resourceCollectorShuttle); - - resourceActions = new HashSet<>(resourceCollectorShuttle.getResourceActions()); + state = State.VALIDATED; + } - if (parsed.isInsert() || parsed.isReplace()) { - // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes - // the number of rows inserted to be limited which is likely to be confusing and unintended. - if (plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) { - throw new ValidationException( - StringUtils.format( - "%s cannot be provided with %s.", - PlannerContext.CTX_SQL_OUTER_LIMIT, - parsed.getInsertOrReplace().getOperator().getName() - ) - ); + private SqlStatementHandler createHandler(final SqlNode node) throws ValidationException + { + SqlNode query = node; + SqlExplain explain = null; + if (query.getKind() == SqlKind.EXPLAIN) { + explain = (SqlExplain) query; + query = explain.getExplicandum(); + } + + SqlStatementHandler.HandlerContext handlerContext = new HandlerContextImpl(); + if (query.getKind() == SqlKind.INSERT) { + if (query instanceof DruidSqlInsert) { + return new IngestHandler.InsertHandler(handlerContext, (DruidSqlInsert) query, explain); + } else if (query instanceof DruidSqlReplace) { + return new IngestHandler.ReplaceHandler(handlerContext, (DruidSqlReplace) query, explain); } - final String targetDataSource = validateAndGetDataSourceForIngest(parsed.getInsertOrReplace()); - resourceActions.add(new ResourceAction(new Resource(targetDataSource, ResourceType.DATASOURCE), Action.WRITE)); } - state = State.VALIDATED; - plannerContext.setResourceActions(resourceActions); + if (query.isA(SqlKind.QUERY)) { + return new QueryHandler.SelectHandler(handlerContext, query, explain); + } + throw new ValidationException(StringUtils.format("Cannot execute [%s].", node.getKind())); } + /** * Prepare a SQL query for execution, including some initial parsing and * validation and any dynamic parameter type resolution, to support prepared @@ -260,30 +151,9 @@ public void validate() throws SqlParseException, ValidationException public PrepareResult prepare() { Preconditions.checkState(state == State.VALIDATED); - - rootQueryRel = planner.rel(validatedQueryNode); - doPrepare(); + handler.prepare(); state = State.PREPARED; - return prepareResult; - } - - private void doPrepare() - { - final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory(); - final SqlValidator validator = planner.getValidator(); - final RelDataType parameterTypes = validator.getParameterRowType(validatedQueryNode); - final RelDataType returnedRowType; - - if (parsed.getExplainNode() != null) { - returnedRowType = getExplainStructType(typeFactory); - } else if (parsed.isSelect()) { - returnedRowType = engine.resultTypeForSelect(typeFactory, rootQueryRel.validatedRowType); - } else { - assert parsed.insertOrReplace != null; - returnedRowType = engine.resultTypeForInsert(typeFactory, rootQueryRel.validatedRowType); - } - - prepareResult = new PrepareResult(rootQueryRel.validatedRowType, returnedRowType, parameterTypes); + return prepareResult(); } /** @@ -309,14 +179,14 @@ public Access authorize(Function, Access> authorizer, boolea /** * Return the resource actions corresponding to the datasources and views which - * an authenticated request must be authorized for to process the - * query. The actions will be {@code null} if the - * planner has not yet advanced to the validation step. This may occur if - * validation fails and the caller accesses the resource - * actions as part of clean-up. + * an authenticated request must be authorized for to process the query. The + * actions will be {@code null} if the planner has not yet advanced to the + * validation step. This may occur if validation fails and the caller accesses + * the resource actions as part of clean-up. */ public Set resourceActions(boolean includeContext) { + Set resourceActions = plannerContext.getResourceActions(); if (includeContext) { Set actions = new HashSet<>(resourceActions); plannerContext.getQueryContext().getUserParams().keySet().forEach(contextParam -> actions.add( @@ -330,69 +200,18 @@ public Set resourceActions(boolean includeContext) /** * Plan an SQL query for execution, returning a {@link PlannerResult} which can be used to actually execute the query. - * - * Ideally, the query can be planned into a native Druid query, using {@link #planWithDruidConvention}, but will - * fall-back to {@link #planWithBindableConvention} if this is not possible. - * - * Planning reuses the validation done in `validate()` which must be called first. + *

+ * Ideally, the query can be planned into a native Druid query, but will + * fall-back to bindable convention if this is not possible. + *

+ * Planning reuses the validation done in {@code validate()} which must be called first. */ public PlannerResult plan() throws ValidationException { Preconditions.checkState(state == State.VALIDATED || state == State.PREPARED); Preconditions.checkState(authorized); - if (state == State.VALIDATED) { - rootQueryRel = planner.rel(validatedQueryNode); - } - - final Set bindableTables = getBindableTables(rootQueryRel.rel); - - // the planner's type factory is not available until after parsing - this.rexBuilder = new RexBuilder(planner.getTypeFactory()); state = State.PLANNED; - - try { - if (!bindableTables.isEmpty()) { - // Consider BINDABLE convention when necessary. Used for metadata tables. - - if (parsed.isInsert() || parsed.isReplace()) { - // Throws ValidationException if the target table is itself bindable. - validateAndGetDataSourceForIngest(parsed.getInsertOrReplace()); - } - - if (!plannerContext.engineHasFeature(EngineFeature.ALLOW_BINDABLE_PLAN)) { - throw new ValidationException( - StringUtils.format( - "Cannot query table%s [%s] with SQL engine '%s'.", - bindableTables.size() != 1 ? "s" : "", - bindableTables.stream() - .map(table -> Joiner.on(".").join(table.getQualifiedName())) - .collect(Collectors.joining(", ")), - engine.name() - ) - ); - } - - return planWithBindableConvention(rootQueryRel, parsed.getExplainNode()); - } else { - // DRUID convention is used whenever there are no tables that require BINDABLE. - return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(), parsed.getInsertOrReplace()); - } - } - catch (Exception e) { - Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class); - if (null == cannotPlanException) { - // Not a CannotPlanException, rethrow without logging. - throw e; - } - - Logger logger = log; - if (!plannerContext.getQueryContext().isDebug()) { - logger = log.noStackTrace(); - } - String errorMessage = buildSQLPlanningErrorMessage(cannotPlanException); - logger.warn(e, errorMessage); - throw new UnsupportedSQLQueryException(errorMessage); - } + return handler.plan(); } public PlannerContext getPlannerContext() @@ -402,7 +221,7 @@ public PlannerContext getPlannerContext() public PrepareResult prepareResult() { - return prepareResult; + return handler.prepareResult(); } @Override @@ -411,647 +230,48 @@ public void close() planner.close(); } - /** - * Construct a {@link PlannerResult} for a {@link RelNode} that is directly translatable to a native Druid query. - */ - private PlannerResult planWithDruidConvention( - final RelRoot root, - @Nullable final SqlExplain explain, - @Nullable final SqlInsert insertOrReplace - ) throws ValidationException + protected class HandlerContextImpl implements SqlStatementHandler.HandlerContext { - final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(root); - final QueryMaker queryMaker = buildQueryMaker(possiblyLimitedRoot, insertOrReplace); - plannerContext.setQueryMaker(queryMaker); - if (prepareResult == null) { - doPrepare(); - } - - // Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle} - // in the event that {@link #rewriteDynamicParameters(SqlNode)} was unable to - // successfully substitute all parameter values, and will cause a failure if any - // dynamic a parameters are not bound. This occurs at least for DATE parameters - // with integer values. - // - // This check also catches the case where we did not do a parameter check earlier - // because no values were provided. (Values are not required in the PREPARE case - // but now that we're planning, we require them.) - RelNode parameterized = possiblyLimitedRoot.rel.accept( - new RelParameterizerShuttle(plannerContext) - ); - final DruidRel druidRel = (DruidRel) planner.transform( - CalciteRulesManager.DRUID_CONVENTION_RULES, - planner.getEmptyTraitSet() - .replace(DruidConvention.instance()) - .plus(root.collation), - parameterized - ); - - if (explain != null) { - return planExplanation(druidRel, explain, true); - } else { - // Compute row type. - final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory(); - final RelDataType rowType; - - if (parsed.isSelect()) { - rowType = engine.resultTypeForSelect(typeFactory, rootQueryRel.validatedRowType); - } else { - assert parsed.insertOrReplace != null; - rowType = engine.resultTypeForInsert(typeFactory, rootQueryRel.validatedRowType); - } - - // Start the query. - final Supplier resultsSupplier = () -> { - // sanity check - final Set readResourceActions = - plannerContext.getResourceActions() - .stream() - .filter(action -> action.getAction() == Action.READ) - .collect(Collectors.toSet()); - Preconditions.checkState( - readResourceActions.isEmpty() == druidRel.getDataSourceNames().isEmpty() - // The resources found in the plannerContext can be less than the datasources in - // the query plan, because the query planner can eliminate empty tables by replacing - // them with InlineDataSource of empty rows. - || readResourceActions.size() >= druidRel.getDataSourceNames().size(), - "Authorization sanity check failed" - ); - - return druidRel.runQuery(); - }; - - return new PlannerResult(resultsSupplier, rowType); - } - } - - /** - * Construct a {@link PlannerResult} for a fall-back 'bindable' rel, for - * things that are not directly translatable to native Druid queries such - * as system tables and just a general purpose (but definitely not optimized) - * fall-back. - * - * See {@link #planWithDruidConvention} which will handle things which are - * directly translatable to native Druid queries. - * - * The bindable path handles parameter substitution of any values not - * bound by the earlier steps. - */ - private PlannerResult planWithBindableConvention( - final RelRoot root, - @Nullable final SqlExplain explain - ) - { - if (prepareResult == null) { - doPrepare(); - } - - BindableRel bindableRel = (BindableRel) planner.transform( - CalciteRulesManager.BINDABLE_CONVENTION_RULES, - planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(root.collation), - root.rel - ); - - if (!root.isRefTrivial()) { - // Add a projection on top to accommodate root.fields. - final List projects = new ArrayList<>(); - final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder(); - for (int field : Pair.left(root.fields)) { - projects.add(rexBuilder.makeInputRef(bindableRel, field)); - } - bindableRel = new Bindables.BindableProject( - bindableRel.getCluster(), - bindableRel.getTraitSet(), - bindableRel, - projects, - root.validatedRowType - ); - } - - if (explain != null) { - return planExplanation(bindableRel, explain, false); - } else { - final BindableRel theRel = bindableRel; - final DataContext dataContext = plannerContext.createDataContext( - planner.getTypeFactory(), - plannerContext.getParameters() - ); - final Supplier resultsSupplier = () -> { - final Enumerable enumerable = theRel.bind(dataContext); - final Enumerator enumerator = enumerable.enumerator(); - return QueryResponse.withEmptyContext(Sequences.withBaggage( - new BaseSequence<>( - new BaseSequence.IteratorMaker>() - { - @Override - public EnumeratorIterator make() - { - return new EnumeratorIterator<>(new Iterator() - { - @Override - public boolean hasNext() - { - return enumerator.moveNext(); - } - - @Override - public Object[] next() - { - return (Object[]) enumerator.current(); - } - }); - } - - @Override - public void cleanup(EnumeratorIterator iterFromMake) - { - - } - } - ), enumerator::close) - ); - }; - return new PlannerResult(resultsSupplier, root.validatedRowType); - } - } - - /** - * Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode} - */ - private PlannerResult planExplanation( - final RelNode rel, - final SqlExplain explain, - final boolean isDruidConventionExplanation - ) - { - String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel()); - String resourcesString; - try { - if (isDruidConventionExplanation && rel instanceof DruidRel) { - // Show the native queries instead of Calcite's explain if the legacy flag is turned off - if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) { - DruidRel druidRel = (DruidRel) rel; - try { - explanation = explainSqlPlanAsNativeQueries(druidRel); - } - catch (Exception ex) { - log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan"); - } - } - } - final Set resources = - plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet()); - resourcesString = plannerContext.getJsonMapper().writeValueAsString(resources); - } - catch (JsonProcessingException jpe) { - // this should never happen, we create the Resources here, not a user - log.error(jpe, "Encountered exception while serializing Resources for explain output"); - resourcesString = null; - } - final Supplier resultsSupplier = Suppliers.ofInstance( - QueryResponse.withEmptyContext(Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString}))) - ); - return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory())); - } - - /** - * This method doesn't utilize the Calcite's internal {@link RelOptUtil#dumpPlan} since that tends to be verbose - * and not indicative of the native Druid Queries which will get executed - * This method assumes that the Planner has converted the RelNodes to DruidRels, and thereby we can implicitly cast it - * - * @param rel Instance of the root {@link DruidRel} which is formed by running the planner transformations on it - * @return A string representing an array of native queries that correspond to the given SQL query, in JSON format - * @throws JsonProcessingException - */ - private String explainSqlPlanAsNativeQueries(DruidRel rel) throws JsonProcessingException - { - ObjectMapper jsonMapper = plannerContext.getJsonMapper(); - List druidQueryList; - druidQueryList = flattenOutermostRel(rel) - .stream() - .map(druidRel -> druidRel.toDruidQuery(false)) - .collect(Collectors.toList()); - - - // Putting the queries as object node in an ArrayNode, since directly returning a list causes issues when - // serializing the "queryType". Another method would be to create a POJO containing query and signature, and then - // serializing it using normal list method. - ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode(); - - for (DruidQuery druidQuery : druidQueryList) { - Query nativeQuery = druidQuery.getQuery(); - ObjectNode objectNode = jsonMapper.createObjectNode(); - objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class)); - objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class)); - nativeQueriesArrayNode.add(objectNode); - } - - return jsonMapper.writeValueAsString(nativeQueriesArrayNode); - } - - /** - * Given a {@link DruidRel}, this method recursively flattens the Rels if they are of the type {@link DruidUnionRel} - * It is implicitly assumed that the {@link DruidUnionRel} can never be the child of a non {@link DruidUnionRel} - * node - * For eg, a DruidRel structure of kind: - * DruidUnionRel - * DruidUnionRel - * DruidRel (A) - * DruidRel (B) - * DruidRel(C) - * will return [DruidRel(A), DruidRel(B), DruidRel(C)] - * - * @param outermostDruidRel The outermost rel which is to be flattened - * @return a list of DruidRel's which donot have a DruidUnionRel nested in between them - */ - private List> flattenOutermostRel(DruidRel outermostDruidRel) - { - List> druidRels = new ArrayList<>(); - flattenOutermostRel(outermostDruidRel, druidRels); - return druidRels; - } - - /** - * Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if - * they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the - * nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()} - * - * @param druidRel The current relNode - * @param flattendListAccumulator Accumulator list which needs to be appended by this method - */ - private void flattenOutermostRel(DruidRel druidRel, List> flattendListAccumulator) - { - if (druidRel instanceof DruidUnionRel) { - DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel; - druidUnionRel.getInputs().forEach(innerRelNode -> { - DruidRel innerDruidRelNode = (DruidRel) innerRelNode; // This type conversion should always be possible - flattenOutermostRel(innerDruidRelNode, flattendListAccumulator); - }); - } else { - flattendListAccumulator.add(druidRel); - } - } - - /** - * This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel - * is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in - * {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}. - * - * The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by - * the web console, allowing it to apply a limit to queries without rewriting the original SQL. - * - * @param root root node - * - * @return root node wrapped with a limiting logical sort if a limit is specified in the query context. - */ - @Nullable - private RelRoot possiblyWrapRootWithOuterLimitFromContext(RelRoot root) - { - Object outerLimitObj = plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT); - Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true); - if (outerLimit == null) { - return root; - } - - final LogicalSort newRootRel; - - if (root.rel instanceof Sort) { - Sort sort = (Sort) root.rel; - - final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort); - final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit)); - - if (newOffsetLimit.equals(originalOffsetLimit)) { - // nothing to do, don't bother to make a new sort - return root; - } - - newRootRel = LogicalSort.create( - sort.getInput(), - sort.collation, - newOffsetLimit.getOffsetAsRexNode(rexBuilder), - newOffsetLimit.getLimitAsRexNode(rexBuilder) - ); - } else { - newRootRel = LogicalSort.create( - root.rel, - root.collation, - null, - new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder) - ); - } - - return new RelRoot(newRootRel, root.validatedRowType, root.kind, root.fields, root.collation); - } - - private QueryMaker buildQueryMaker( - final RelRoot rootQueryRel, - @Nullable final SqlInsert insertOrReplace - ) throws ValidationException - { - if (insertOrReplace != null) { - final String targetDataSource = validateAndGetDataSourceForIngest(insertOrReplace); - validateColumnsForIngestion(rootQueryRel); - return engine.buildQueryMakerForInsert(targetDataSource, rootQueryRel, plannerContext); - } else { - return engine.buildQueryMakerForSelect(rootQueryRel, plannerContext); - } - } - - private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory) - { - return typeFactory.createStructType( - ImmutableList.of( - Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR), - Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR) - ), - ImmutableList.of("PLAN", "RESOURCES") - ); - } - - /** - * Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support. - * Expects the target datasource to be either an unqualified name, or a name qualified by the default schema. - */ - private String validateAndGetDataSourceForIngest(final SqlInsert insert) throws ValidationException - { - final String operatorName = insert.getOperator().getName(); - if (insert.isUpsert()) { - throw new ValidationException("UPSERT is not supported."); - } - - if (insert.getTargetColumnList() != null) { - throw new ValidationException(operatorName + " with target column list is not supported."); - } - - final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable(); - final String dataSource; - - if (tableIdentifier.names.isEmpty()) { - // I don't think this can happen, but include a branch for it just in case. - throw new ValidationException(operatorName + " requires target table."); - } else if (tableIdentifier.names.size() == 1) { - // Unqualified name. - dataSource = Iterables.getOnlyElement(tableIdentifier.names); - } else { - // Qualified name. - final String defaultSchemaName = - Iterables.getOnlyElement(CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null)); - - if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) { - dataSource = tableIdentifier.names.get(1); - } else { - throw new ValidationException( - StringUtils.format( - "Cannot %s into [%s] because it is not a Druid datasource (schema = %s).", - operatorName, - tableIdentifier, - defaultSchemaName - ) - ); - } - } - - try { - IdUtils.validateId(operatorName + " dataSource", dataSource); - } - catch (IllegalArgumentException e) { - throw new ValidationException(e.getMessage()); - } - - return dataSource; - } - - private void validateColumnsForIngestion(RelRoot rootQueryRel) throws ValidationException - { - // Check that there are no unnamed columns in the insert. - for (Pair field : rootQueryRel.fields) { - if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) { - throw new ValidationException(UNNAMED_INGESTION_COLUMN_ERROR); - } - } - } - - private String buildSQLPlanningErrorMessage(Throwable exception) - { - String errorMessage = plannerContext.getPlanningError(); - if (null == errorMessage && exception instanceof UnsupportedSQLQueryException) { - errorMessage = exception.getMessage(); - } - if (null == errorMessage) { - errorMessage = "Please check Broker logs for additional details."; - } else { - // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong. - errorMessage = "Possible error: " + errorMessage; - } - // Finally, add the query itself to error message that user will get. - return StringUtils.format("Query not supported. %s SQL was: %s", errorMessage, plannerContext.getSql()); - } - - private static Set getBindableTables(final RelNode relNode) - { - class HasBindableVisitor extends RelVisitor - { - private final Set found = new HashSet<>(); - - @Override - public void visit(RelNode node, int ordinal, RelNode parent) - { - if (node instanceof TableScan) { - RelOptTable table = node.getTable(); - if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) { - found.add(table); - return; - } - } - - super.visit(node, ordinal, parent); - } - } - - final HasBindableVisitor visitor = new HasBindableVisitor(); - visitor.go(relNode); - return visitor.found; - } - - private static class EnumeratorIterator implements Iterator - { - private final Iterator it; - - EnumeratorIterator(Iterator it) - { - this.it = it; - } - @Override - public boolean hasNext() + public PlannerContext plannerContext() { - return it.hasNext(); + return plannerContext; } @Override - public T next() - { - return it.next(); - } - } - - private static class ParsedNodes - { - @Nullable - private final SqlExplain explain; - - @Nullable - private final SqlInsert insertOrReplace; - - private final SqlNode query; - - @Nullable - private final Granularity ingestionGranularity; - - @Nullable - private final List replaceIntervals; - - private ParsedNodes( - @Nullable SqlExplain explain, - @Nullable SqlInsert insertOrReplace, - SqlNode query, - @Nullable Granularity ingestionGranularity, - @Nullable List replaceIntervals - ) - { - this.explain = explain; - this.insertOrReplace = insertOrReplace; - this.query = query; - this.ingestionGranularity = ingestionGranularity; - this.replaceIntervals = replaceIntervals; - } - - static ParsedNodes create(final SqlNode node, DateTimeZone dateTimeZone) throws ValidationException + public SqlEngine engine() { - SqlNode query = node; - SqlExplain explain = null; - if (query.getKind() == SqlKind.EXPLAIN) { - explain = (SqlExplain) query; - query = explain.getExplicandum(); - } - - if (query.getKind() == SqlKind.INSERT) { - if (query instanceof DruidSqlInsert) { - return handleInsert(explain, (DruidSqlInsert) query); - } else if (query instanceof DruidSqlReplace) { - return handleReplace(explain, (DruidSqlReplace) query, dateTimeZone); - } - } - - if (!query.isA(SqlKind.QUERY)) { - throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind())); - } - - return new ParsedNodes(explain, null, query, null, null); + return engine; } - static ParsedNodes handleInsert(SqlExplain explain, DruidSqlInsert druidSqlInsert) throws ValidationException - { - SqlNode query = druidSqlInsert.getSource(); - - // Check if ORDER BY clause is not provided to the underlying query - if (query instanceof SqlOrderBy) { - SqlOrderBy sqlOrderBy = (SqlOrderBy) query; - SqlNodeList orderByList = sqlOrderBy.orderList; - if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { - throw new ValidationException("Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead."); - } - } - - Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy(); - - if (druidSqlInsert.getClusteredBy() != null) { - query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlInsert.getClusteredBy()); - } - - if (!query.isA(SqlKind.QUERY)) { - throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind())); - } - - return new ParsedNodes(explain, druidSqlInsert, query, ingestionGranularity, null); - } - - static ParsedNodes handleReplace(SqlExplain explain, DruidSqlReplace druidSqlReplace, DateTimeZone dateTimeZone) - throws ValidationException - { - SqlNode query = druidSqlReplace.getSource(); - - // Check if ORDER BY clause is not provided to the underlying query - if (query instanceof SqlOrderBy) { - SqlOrderBy sqlOrderBy = (SqlOrderBy) query; - SqlNodeList orderByList = sqlOrderBy.orderList; - if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { - throw new ValidationException("Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead."); - } - } - - SqlNode replaceTimeQuery = druidSqlReplace.getReplaceTimeQuery(); - if (replaceTimeQuery == null) { - throw new ValidationException("Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL."); - } - - Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy(); - List replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery, ingestionGranularity, dateTimeZone); - - if (druidSqlReplace.getClusteredBy() != null) { - query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlReplace.getClusteredBy()); - } - - if (!query.isA(SqlKind.QUERY)) { - throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind())); - } - - return new ParsedNodes(explain, druidSqlReplace, query, ingestionGranularity, replaceIntervals); - } - - @Nullable - public SqlExplain getExplainNode() - { - return explain; - } - - public boolean isSelect() - { - return insertOrReplace == null; - } - - public boolean isInsert() - { - return insertOrReplace != null && !isReplace(); - } - - public boolean isReplace() + @Override + public CalcitePlanner planner() { - return insertOrReplace instanceof DruidSqlReplace; + return planner; } - @Nullable - public SqlInsert getInsertOrReplace() + @Override + public QueryContext queryContext() { - return insertOrReplace; + return plannerContext.getQueryContext(); } - @Nullable - public List getReplaceIntervals() + @Override + public SchemaPlus defaultSchema() { - return replaceIntervals; + return frameworkConfig.getDefaultSchema(); } - public SqlNode getQueryNode() + @Override + public ObjectMapper jsonMapper() { - return query; + return plannerContext.getJsonMapper(); } - @Nullable - public Granularity getIngestionGranularity() + @Override + public DateTimeZone timeZone() { - return ingestionGranularity; + return plannerContext.getTimeZone(); } } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java new file mode 100644 index 000000000000..e59b8cf5e7f6 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.tools.ValidationException; +import org.apache.calcite.util.Pair; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; +import org.apache.druid.sql.calcite.parser.DruidSqlIngest; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; +import org.apache.druid.sql.calcite.parser.DruidSqlReplace; +import org.apache.druid.sql.calcite.run.EngineFeature; +import org.apache.druid.sql.calcite.run.QueryMaker; + +import java.util.List; +import java.util.regex.Pattern; + +public abstract class IngestHandler extends QueryHandler +{ + private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE); + @VisibleForTesting + public static final String UNNAMED_INGESTION_COLUMN_ERROR = + "Cannot ingest expressions that do not have an alias " + + "or columns with names like EXPR$[digit].\n" + + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as " + + "\"func(X) as myColumn\""; + + protected final Granularity ingestionGranularity; + protected String targetDatasource; + + IngestHandler( + HandlerContext handlerContext, + DruidSqlIngest ingestNode, + SqlNode queryNode, + SqlExplain explain + ) + { + super(handlerContext, queryNode, explain); + this.ingestionGranularity = ingestNode.getPartitionedBy(); + } + + protected static SqlNode convertQuery(DruidSqlIngest sqlNode) throws ValidationException + { + SqlNode query = sqlNode.getSource(); + + // Check if ORDER BY clause is not provided to the underlying query + if (query instanceof SqlOrderBy) { + SqlOrderBy sqlOrderBy = (SqlOrderBy) query; + SqlNodeList orderByList = sqlOrderBy.orderList; + if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { + String opName = sqlNode.getOperator().getName(); + throw new ValidationException(StringUtils.format( + "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.", + "INSERT".equals(opName) ? "an" : "a", + opName + )); + } + } + if (sqlNode.getClusteredBy() != null) { + query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy()); + } + + if (!query.isA(SqlKind.QUERY)) { + throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind())); + } + return query; + } + + protected String operationName() + { + return ingestNode().getOperator().getName(); + } + + protected abstract DruidSqlIngest ingestNode(); + + @Override + public void validate() throws ValidationException + { + if (ingestNode().getPartitionedBy() == null) { + throw new ValidationException(StringUtils.format( + "%s statements must specify PARTITIONED BY clause explicitly", + operationName() + )); + } + try { + PlannerContext plannerContext = handlerContext.plannerContext(); + if (ingestionGranularity != null) { + plannerContext.getQueryContext().addSystemParam( + DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, + plannerContext.getJsonMapper().writeValueAsString(ingestionGranularity) + ); + } + } + catch (JsonProcessingException e) { + throw new ValidationException("Unable to serialize partition granularity."); + } + super.validate(); + // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes + // the number of rows inserted to be limited which is likely to be confusing and unintended. + if (handlerContext.queryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) { + throw new ValidationException( + StringUtils.format( + "%s cannot be provided with %s.", + PlannerContext.CTX_SQL_OUTER_LIMIT, + operationName() + ) + ); + } + targetDatasource = validateAndGetDataSourceForIngest(); + resourceActions.add(new ResourceAction(new Resource(targetDatasource, ResourceType.DATASOURCE), Action.WRITE)); + } + + @Override + protected RelDataType returnedRowType() + { + final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory(); + return handlerContext.engine().resultTypeForInsert( + typeFactory, + rootQueryRel.validatedRowType); + } + + /** + * Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support. + * Expects the target datasource to be either an unqualified name, or a name qualified by the default schema. + */ + private String validateAndGetDataSourceForIngest() throws ValidationException + { + final SqlInsert insert = ingestNode(); + if (insert.isUpsert()) { + throw new ValidationException("UPSERT is not supported."); + } + + if (insert.getTargetColumnList() != null) { + throw new ValidationException(operationName() + " with a target column list is not supported."); + } + + final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable(); + final String dataSource; + + if (tableIdentifier.names.isEmpty()) { + // I don't think this can happen, but include a branch for it just in case. + throw new ValidationException(operationName() + " requires a target table."); + } else if (tableIdentifier.names.size() == 1) { + // Unqualified name. + dataSource = Iterables.getOnlyElement(tableIdentifier.names); + } else { + // Qualified name. + final String defaultSchemaName = + Iterables.getOnlyElement(CalciteSchema.from(handlerContext.defaultSchema()).path(null)); + + if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) { + dataSource = tableIdentifier.names.get(1); + } else { + throw new ValidationException( + StringUtils.format( + "Cannot %s into %s because it is not a Druid datasource.", + operationName(), + tableIdentifier + ) + ); + } + } + + try { + IdUtils.validateId(operationName() + " dataSource", dataSource); + } + catch (IllegalArgumentException e) { + throw new ValidationException(e.getMessage()); + } + + return dataSource; + } + + @Override + protected PlannerResult planForDruid() throws ValidationException + { + return planWithDruidConvention(); + } + + @Override + protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException + { + validateColumnsForIngestion(rootQueryRel); + return handlerContext.engine().buildQueryMakerForInsert( + targetDatasource, + rootQueryRel, + handlerContext.plannerContext()); + } + + private void validateColumnsForIngestion(RelRoot rootQueryRel) throws ValidationException + { + // Check that there are no unnamed columns in the insert. + for (Pair field : rootQueryRel.fields) { + if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) { + throw new ValidationException(UNNAMED_INGESTION_COLUMN_ERROR); + } + } + } + + /** + * Handler for the INSERT statement. + */ + protected static class InsertHandler extends IngestHandler + { + private final DruidSqlInsert sqlNode; + + public InsertHandler( + SqlStatementHandler.HandlerContext handlerContext, + DruidSqlInsert sqlNode, + SqlExplain explain + ) throws ValidationException + { + super( + handlerContext, + sqlNode, + convertQuery(sqlNode), + explain); + this.sqlNode = sqlNode; + } + + @Override + public SqlNode sqlNode() + { + return sqlNode; + } + + @Override + protected DruidSqlIngest ingestNode() + { + return sqlNode; + } + + @Override + public void validate() throws ValidationException + { + if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_INSERT)) { + throw new ValidationException(StringUtils.format( + "Cannot execute INSERT with SQL engine '%s'.", + handlerContext.engine().name()) + ); + } + super.validate(); + } + } + + /** + * Handler for the REPLACE statement. + */ + protected static class ReplaceHandler extends IngestHandler + { + private final DruidSqlReplace sqlNode; + private List replaceIntervals; + + public ReplaceHandler( + SqlStatementHandler.HandlerContext handlerContext, + DruidSqlReplace sqlNode, + SqlExplain explain + ) throws ValidationException + { + super( + handlerContext, + sqlNode, + convertQuery(sqlNode), + explain + ); + this.sqlNode = sqlNode; + } + + @Override + public SqlNode sqlNode() + { + return sqlNode; + } + + @Override + protected DruidSqlIngest ingestNode() + { + return sqlNode; + } + + @Override + public void validate() throws ValidationException + { + if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_REPLACE)) { + throw new ValidationException(StringUtils.format( + "Cannot execute REPLACE with SQL engine '%s'.", + handlerContext.engine().name()) + ); + } + SqlNode replaceTimeQuery = sqlNode.getReplaceTimeQuery(); + if (replaceTimeQuery == null) { + throw new ValidationException("Missing time chunk information in OVERWRITE clause for REPLACE. Use " + + "OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table."); + } + + replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals( + replaceTimeQuery, + ingestionGranularity, + handlerContext.timeZone()); + super.validate(); + if (replaceIntervals != null) { + handlerContext.queryContext().addSystemParam( + DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS, + String.join(",", replaceIntervals) + ); + } + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java index 57e3ba2cbc9f..cbb07ddb48a4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java @@ -32,12 +32,12 @@ */ public class PlannerResult { - private final Supplier resultsSupplier; + private final Supplier> resultsSupplier; private final RelDataType rowType; private final AtomicBoolean didRun = new AtomicBoolean(); public PlannerResult( - final Supplier resultsSupplier, + final Supplier> resultsSupplier, final RelDataType rowType ) { @@ -53,7 +53,7 @@ public boolean runnable() /** * Run the query */ - public QueryResponse run() + public QueryResponse run() { if (!didRun.compareAndSet(false, true)) { // Safety check. diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java new file mode 100644 index 000000000000..1d6a71b54271 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.DataContext; +import org.apache.calcite.interpreter.BindableConvention; +import org.apache.calcite.interpreter.BindableRel; +import org.apache.calcite.interpreter.Bindables; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.RelVisitor; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.tools.ValidationException; +import org.apache.calcite.util.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.BaseSequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.Query; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.server.QueryResponse; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.rel.DruidConvention; +import org.apache.druid.sql.calcite.rel.DruidQuery; +import org.apache.druid.sql.calcite.rel.DruidRel; +import org.apache.druid.sql.calcite.rel.DruidUnionRel; +import org.apache.druid.sql.calcite.run.EngineFeature; +import org.apache.druid.sql.calcite.run.QueryMaker; +import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.utils.Throwables; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Abstract base class for handlers that revolve around queries: SELECT, + * INSERT and REPLACE. This class handles the common SELECT portion of the statement. + */ +public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHandler +{ + static final EmittingLogger log = new EmittingLogger(QueryHandler.class); + + protected SqlNode queryNode; + protected SqlExplain explain; + protected SqlNode validatedQueryNode; + private boolean isPrepared; + protected RelRoot rootQueryRel; + private PrepareResult prepareResult; + protected RexBuilder rexBuilder; + + public QueryHandler(SqlStatementHandler.HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain) + { + super(handlerContext); + this.queryNode = sqlNode; + this.explain = explain; + } + + @Override + public void validate() throws ValidationException + { + CalcitePlanner planner = handlerContext.planner(); + validatedQueryNode = planner.validate(rewriteParameters()); + + final SqlValidator validator = planner.getValidator(); + SqlResourceCollectorShuttle resourceCollectorShuttle = new SqlResourceCollectorShuttle( + validator, + handlerContext.plannerContext() + ); + validatedQueryNode.accept(resourceCollectorShuttle); + resourceActions = resourceCollectorShuttle.getResourceActions(); + } + + private SqlNode rewriteParameters() + { + // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any + // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral} + // replacement. + // + // Parameter replacement is done only if the client provides parameter values. + // If this is a PREPARE-only, then there will be no values even if the statement contains + // parameters. If this is a PLAN, then we'll catch later the case that the statement + // contains parameters, but no values were provided. + PlannerContext plannerContext = handlerContext.plannerContext(); + if (plannerContext.getParameters().isEmpty()) { + return queryNode; + } else { + return queryNode.accept(new SqlParameterizerShuttle(plannerContext)); + } + } + + @Override + public void prepare() + { + if (isPrepared) { + return; + } + isPrepared = true; + rootQueryRel = handlerContext.planner().rel(validatedQueryNode); + final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory(); + final SqlValidator validator = handlerContext.planner().getValidator(); + final RelDataType parameterTypes = validator.getParameterRowType(validatedQueryNode); + final RelDataType returnedRowType; + + if (explain != null) { + returnedRowType = getExplainStructType(typeFactory); + } else { + returnedRowType = returnedRowType(); + } + + prepareResult = new PrepareResult(rootQueryRel.validatedRowType, returnedRowType, parameterTypes); + } + + @Override + public PrepareResult prepareResult() + { + return prepareResult; + } + + protected abstract RelDataType returnedRowType(); + + private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory) + { + return typeFactory.createStructType( + ImmutableList.of( + Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR), + Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR) + ), + ImmutableList.of("PLAN", "RESOURCES") + ); + } + + @Override + public PlannerResult plan() throws ValidationException + { + prepare(); + final Set bindableTables = getBindableTables(rootQueryRel.rel); + + // the planner's type factory is not available until after parsing + rexBuilder = new RexBuilder(handlerContext.planner().getTypeFactory()); + + try { + if (!bindableTables.isEmpty()) { + // Consider BINDABLE convention when necessary. Used for metadata tables. + + if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.ALLOW_BINDABLE_PLAN)) { + throw new ValidationException( + StringUtils.format( + "Cannot query table%s %s with SQL engine '%s'.", + bindableTables.size() != 1 ? "s" : "", + bindableTables.stream() + .map(table -> Joiner.on(".").join(table.getQualifiedName())) + .collect(Collectors.joining(", ")), + handlerContext.engine().name() + ) + ); + } + + return planWithBindableConvention(); + } else { + // Druid convention is used whenever there are no tables that require BINDABLE. + return planForDruid(); + } + } + catch (Exception e) { + Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class); + if (null == cannotPlanException) { + // Not a CannotPlanException, rethrow without logging. + throw e; + } + + Logger logger = log; + if (!handlerContext.queryContext().isDebug()) { + logger = log.noStackTrace(); + } + String errorMessage = buildSQLPlanningErrorMessage(cannotPlanException); + logger.warn(e, errorMessage); + throw new UnsupportedSQLQueryException(errorMessage); + } + } + + private static Set getBindableTables(final RelNode relNode) + { + class HasBindableVisitor extends RelVisitor + { + private final Set found = new HashSet<>(); + + @Override + public void visit(RelNode node, int ordinal, RelNode parent) + { + if (node instanceof TableScan) { + RelOptTable table = node.getTable(); + if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) { + found.add(table); + return; + } + } + + super.visit(node, ordinal, parent); + } + } + + final HasBindableVisitor visitor = new HasBindableVisitor(); + visitor.go(relNode); + return visitor.found; + } + + /** + * Construct a {@link PlannerResult} for a fall-back 'bindable' rel, for + * things that are not directly translatable to native Druid queries such + * as system tables and just a general purpose (but definitely not optimized) + * fall-back. + * + * See {@link #planWithDruidConvention} which will handle things which are + * directly translatable to native Druid queries. + * + * The bindable path handles parameter substitution of any values not + * bound by the earlier steps. + */ + private PlannerResult planWithBindableConvention() + { + CalcitePlanner planner = handlerContext.planner(); + BindableRel bindableRel = (BindableRel) planner.transform( + CalciteRulesManager.BINDABLE_CONVENTION_RULES, + planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(rootQueryRel.collation), + rootQueryRel.rel + ); + + if (!rootQueryRel.isRefTrivial()) { + // Add a projection on top to accommodate root.fields. + final List projects = new ArrayList<>(); + final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder(); + for (int field : Pair.left(rootQueryRel.fields)) { + projects.add(rexBuilder.makeInputRef(bindableRel, field)); + } + bindableRel = new Bindables.BindableProject( + bindableRel.getCluster(), + bindableRel.getTraitSet(), + bindableRel, + projects, + rootQueryRel.validatedRowType + ); + } + + PlannerContext plannerContext = handlerContext.plannerContext(); + if (explain != null) { + return planExplanation(bindableRel, false); + } else { + final BindableRel theRel = bindableRel; + final DataContext dataContext = plannerContext.createDataContext( + planner.getTypeFactory(), + plannerContext.getParameters() + ); + final Supplier> resultsSupplier = () -> { + final Enumerable enumerable = theRel.bind(dataContext); + final Enumerator enumerator = enumerable.enumerator(); + return QueryResponse.withEmptyContext( + Sequences.withBaggage(new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public QueryHandler.EnumeratorIterator make() + { + return new QueryHandler.EnumeratorIterator<>(new Iterator() + { + @Override + public boolean hasNext() + { + return enumerator.moveNext(); + } + + @Override + public Object[] next() + { + return (Object[]) enumerator.current(); + } + }); + } + + @Override + public void cleanup(QueryHandler.EnumeratorIterator iterFromMake) + { + + } + } + ), enumerator::close) + ); + }; + return new PlannerResult(resultsSupplier, rootQueryRel.validatedRowType); + } + } + + /** + * Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode} + */ + protected PlannerResult planExplanation( + final RelNode rel, + final boolean isDruidConventionExplanation + ) + { + PlannerContext plannerContext = handlerContext.plannerContext(); + String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel()); + String resourcesString; + try { + if (isDruidConventionExplanation && rel instanceof DruidRel) { + // Show the native queries instead of Calcite's explain if the legacy flag is turned off + if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) { + DruidRel druidRel = (DruidRel) rel; + try { + explanation = explainSqlPlanAsNativeQueries(druidRel); + } + catch (Exception ex) { + log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan."); + } + } + } + final Set resources = + plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet()); + resourcesString = plannerContext.getJsonMapper().writeValueAsString(resources); + } + catch (JsonProcessingException jpe) { + // this should never happen, we create the Resources here, not a user + log.error(jpe, "Encountered exception while serializing resources for explain output"); + resourcesString = null; + } + final Supplier> resultsSupplier = Suppliers.ofInstance( + QueryResponse.withEmptyContext( + Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString})) + ) + ); + return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory())); + } + + /** + * This method doesn't utilize the Calcite's internal {@link RelOptUtil#dumpPlan} since that tends to be verbose + * and not indicative of the native Druid Queries which will get executed + * This method assumes that the Planner has converted the RelNodes to DruidRels, and thereby we can implicitly cast it + * + * @param rel Instance of the root {@link DruidRel} which is formed by running the planner transformations on it + * @return A string representing an array of native queries that correspond to the given SQL query, in JSON format + * @throws JsonProcessingException + */ + private String explainSqlPlanAsNativeQueries(DruidRel rel) throws JsonProcessingException + { + ObjectMapper jsonMapper = handlerContext.jsonMapper(); + List druidQueryList; + druidQueryList = flattenOutermostRel(rel) + .stream() + .map(druidRel -> druidRel.toDruidQuery(false)) + .collect(Collectors.toList()); + + + // Putting the queries as object node in an ArrayNode, since directly returning a list causes issues when + // serializing the "queryType". Another method would be to create a POJO containing query and signature, and then + // serializing it using normal list method. + ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode(); + + for (DruidQuery druidQuery : druidQueryList) { + Query nativeQuery = druidQuery.getQuery(); + ObjectNode objectNode = jsonMapper.createObjectNode(); + objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class)); + objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class)); + nativeQueriesArrayNode.add(objectNode); + } + + return jsonMapper.writeValueAsString(nativeQueriesArrayNode); + } + + /** + * Given a {@link DruidRel}, this method recursively flattens the Rels if they are of the type {@link DruidUnionRel} + * It is implicitly assumed that the {@link DruidUnionRel} can never be the child of a non {@link DruidUnionRel} + * node + * E.g. a DruidRel structure of kind:


+   * DruidUnionRel
+   *  DruidUnionRel
+   *    DruidRel (A)
+   *    DruidRel (B)
+   *  DruidRel(C)
+   * will return {@code [DruidRel(A), DruidRel(B), DruidRel(C)]}.
+   *
+   * @param outermostDruidRel The outermost rel which is to be flattened
+   * @return a list of DruidRel's which do not have a DruidUnionRel nested in between them
+   */
+  private List> flattenOutermostRel(DruidRel outermostDruidRel)
+  {
+    List> druidRels = new ArrayList<>();
+    flattenOutermostRel(outermostDruidRel, druidRels);
+    return druidRels;
+  }
+
+  /**
+   * Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if
+   * they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the
+   * nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()}
+   *
+   * @param druidRel                The current relNode
+   * @param flattendListAccumulator Accumulator list which needs to be appended by this method
+   */
+  private void flattenOutermostRel(DruidRel druidRel, List> flattendListAccumulator)
+  {
+    if (druidRel instanceof DruidUnionRel) {
+      DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel;
+      druidUnionRel.getInputs().forEach(innerRelNode -> {
+        DruidRel innerDruidRelNode = (DruidRel) innerRelNode; // This type conversion should always be possible
+        flattenOutermostRel(innerDruidRelNode, flattendListAccumulator);
+      });
+    } else {
+      flattendListAccumulator.add(druidRel);
+    }
+  }
+
+  protected abstract PlannerResult planForDruid() throws ValidationException;
+
+  /**
+   * Construct a {@link PlannerResult} for a {@link RelNode} that is directly translatable to a native Druid query.
+   */
+  protected PlannerResult planWithDruidConvention() throws ValidationException
+  {
+    final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(rootQueryRel);
+    final QueryMaker queryMaker = buildQueryMaker(possiblyLimitedRoot);
+    PlannerContext plannerContext = handlerContext.plannerContext();
+    plannerContext.setQueryMaker(queryMaker);
+
+    // Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle}
+    // in the event that {@link #rewriteDynamicParameters(SqlNode)} was unable to
+    // successfully substitute all parameter values, and will cause a failure if any
+    // dynamic a parameters are not bound. This occurs at least for DATE parameters
+    // with integer values.
+    //
+    // This check also catches the case where we did not do a parameter check earlier
+    // because no values were provided. (Values are not required in the PREPARE case
+    // but now that we're planning, we require them.)
+    RelNode parameterized = possiblyLimitedRoot.rel.accept(
+        new RelParameterizerShuttle(plannerContext)
+    );
+    CalcitePlanner planner = handlerContext.planner();
+    final DruidRel druidRel = (DruidRel) planner.transform(
+        CalciteRulesManager.DRUID_CONVENTION_RULES,
+        planner.getEmptyTraitSet()
+               .replace(DruidConvention.instance())
+               .plus(rootQueryRel.collation),
+        parameterized
+    );
+
+    if (explain != null) {
+      return planExplanation(druidRel, true);
+    } else {
+      // Compute row type.
+      final RelDataType rowType = prepareResult.getReturnedRowType();
+
+      // Start the query.
+      final Supplier> resultsSupplier = () -> {
+        // sanity check
+        final Set readResourceActions =
+            plannerContext.getResourceActions()
+                          .stream()
+                          .filter(action -> action.getAction() == Action.READ)
+                          .collect(Collectors.toSet());
+        Preconditions.checkState(
+            readResourceActions.isEmpty() == druidRel.getDataSourceNames().isEmpty()
+            // The resources found in the plannerContext can be less than the datasources in
+            // the query plan, because the query planner can eliminate empty tables by replacing
+            // them with InlineDataSource of empty rows.
+            || readResourceActions.size() >= druidRel.getDataSourceNames().size(),
+            "Authorization sanity check failed"
+        );
+
+        return druidRel.runQuery();
+      };
+
+      return new PlannerResult(resultsSupplier, rowType);
+    }
+  }
+
+  /**
+   * This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
+   * is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in
+   * {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}.
+   *
+   * The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by
+   * the web console, allowing it to apply a limit to queries without rewriting the original SQL.
+   *
+   * @param root root node
+   *
+   * @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
+   */
+  @Nullable
+  private RelRoot possiblyWrapRootWithOuterLimitFromContext(RelRoot root)
+  {
+    Object outerLimitObj = handlerContext.queryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT);
+    Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true);
+    if (outerLimit == null) {
+      return root;
+    }
+
+    final LogicalSort newRootRel;
+
+    if (root.rel instanceof Sort) {
+      Sort sort = (Sort) root.rel;
+
+      final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort);
+      final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit));
+
+      if (newOffsetLimit.equals(originalOffsetLimit)) {
+        // nothing to do, don't bother to make a new sort
+        return root;
+      }
+
+      newRootRel = LogicalSort.create(
+          sort.getInput(),
+          sort.collation,
+          newOffsetLimit.getOffsetAsRexNode(rexBuilder),
+          newOffsetLimit.getLimitAsRexNode(rexBuilder)
+      );
+    } else {
+      newRootRel = LogicalSort.create(
+          root.rel,
+          root.collation,
+          null,
+          new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder)
+      );
+    }
+
+    return new RelRoot(newRootRel, root.validatedRowType, root.kind, root.fields, root.collation);
+  }
+
+  protected abstract QueryMaker buildQueryMaker(RelRoot rootQueryRel) throws ValidationException;
+
+  private String buildSQLPlanningErrorMessage(Throwable exception)
+  {
+    String errorMessage = handlerContext.plannerContext().getPlanningError();
+    if (null == errorMessage && exception instanceof UnsupportedSQLQueryException) {
+      errorMessage = exception.getMessage();
+    }
+    if (null == errorMessage) {
+      errorMessage = "Please check Broker logs for additional details.";
+    } else {
+      // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong.
+      errorMessage = "Possible error: " + errorMessage;
+    }
+    // Finally, add the query itself to error message that user will get.
+    return StringUtils.format(
+        "Query not supported. %s SQL was: %s", errorMessage,
+        handlerContext.plannerContext().getSql()
+    );
+  }
+
+  public static class SelectHandler extends QueryHandler
+  {
+    private final SqlNode sqlNode;
+
+    public SelectHandler(
+        HandlerContext handlerContext,
+        SqlNode sqlNode,
+        SqlExplain explain)
+    {
+      super(handlerContext, sqlNode, explain);
+      this.sqlNode = sqlNode;
+    }
+
+    @Override
+    public SqlNode sqlNode()
+    {
+      return sqlNode;
+    }
+
+    @Override
+    public void validate() throws ValidationException
+    {
+      if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_SELECT)) {
+        throw new ValidationException(StringUtils.format(
+            "Cannot execute SELECT with SQL engine '%s'.",
+            handlerContext.engine().name())
+        );
+      }
+      super.validate();
+    }
+
+    @Override
+    protected RelDataType returnedRowType()
+    {
+      final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
+      return handlerContext.engine().resultTypeForSelect(
+          typeFactory,
+          rootQueryRel.validatedRowType
+      );
+    }
+
+    @Override
+    protected PlannerResult planForDruid() throws ValidationException
+    {
+      return planWithDruidConvention();
+    }
+
+    @Override
+    protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException
+    {
+      return handlerContext.engine().buildQueryMakerForSelect(
+          rootQueryRel,
+          handlerContext.plannerContext());
+    }
+  }
+
+  private static class EnumeratorIterator implements Iterator
+  {
+    private final Iterator it;
+
+    EnumeratorIterator(Iterator it)
+    {
+      this.it = it;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return it.hasNext();
+    }
+
+    @Override
+    public T next()
+    {
+      return it.next();
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlStatementHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlStatementHandler.java
new file mode 100644
index 000000000000..fa8c4fdb17e8
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlStatementHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.planner;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.joda.time.DateTimeZone;
+
+import java.util.Set;
+
+/**
+ * Handler for a SQL statement. Follows the same lifecycle as the planner,
+ * however this class handles one specific kind of SQL statement.
+ */
+public interface SqlStatementHandler
+{
+  SqlNode sqlNode();
+  void validate() throws ValidationException;
+  Set resourceActions();
+  void prepare();
+  PrepareResult prepareResult();
+  PlannerResult plan() throws ValidationException;
+
+  /**
+   * Context available to statement handlers.
+   */
+  interface HandlerContext
+  {
+    PlannerContext plannerContext();
+    SqlEngine engine();
+    CalcitePlanner planner();
+    QueryContext queryContext();
+    SchemaPlus defaultSchema();
+    ObjectMapper jsonMapper();
+    DateTimeZone timeZone();
+  }
+
+  abstract class BaseStatementHandler implements SqlStatementHandler
+  {
+    protected final HandlerContext handlerContext;
+    protected Set resourceActions;
+
+    protected BaseStatementHandler(HandlerContext handlerContext)
+    {
+      this.handlerContext = handlerContext;
+    }
+
+    @Override
+    public Set resourceActions()
+    {
+      return resourceActions;
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 185d96b945f2..fcf9fb754d8a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -133,7 +133,7 @@ public class DruidQuery
   @Nullable
   private final Sorting sorting;
 
-  private final Query query;
+  private final Query query;
   private final RowSignature outputRowSignature;
   private final RelDataType outputRowType;
   private final VirtualColumnRegistry virtualColumnRegistry;
@@ -795,7 +795,7 @@ public RowSignature getOutputRowSignature()
     return outputRowSignature;
   }
 
-  public Query getQuery()
+  public Query getQuery()
   {
     return query;
   }
@@ -806,7 +806,7 @@ public Query getQuery()
    *
    * @return Druid query
    */
-  private Query computeQuery()
+  private Query computeQuery()
   {
     if (dataSource instanceof QueryDataSource) {
       // If there is a subquery, then we prefer the outer query to be a groupBy if possible, since this potentially
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
index 9043577a7da7..7bf305d42b98 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
@@ -28,7 +28,7 @@
 import javax.annotation.Nullable;
 import java.util.Set;
 
-public abstract class DruidRel extends AbstractRelNode
+public abstract class DruidRel> extends AbstractRelNode
 {
   private final PlannerContext plannerContext;
 
@@ -45,7 +45,7 @@ protected DruidRel(RelOptCluster cluster, RelTraitSet traitSet, PlannerContext p
   @Nullable
   public abstract PartialDruidQuery getPartialDruidQuery();
 
-  public QueryResponse runQuery()
+  public QueryResponse runQuery()
   {
     // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
     // is the outermost query, and it will actually get run as a native query. Druid's native query layer will
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
index de1bc8b758ea..f754fc0cf022 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
@@ -107,11 +107,11 @@ public PartialDruidQuery getPartialDruidQuery()
 
   @Override
   @SuppressWarnings({"unchecked", "rawtypes"})
-  public QueryResponse runQuery()
+  public QueryResponse runQuery()
   {
     // Lazy: run each query in sequence, not all at once.
     if (limit == 0) {
-      return new QueryResponse(Sequences.empty(), ResponseContext.createEmpty());
+      return new QueryResponse(Sequences.empty(), ResponseContext.createEmpty());
     } else {
 
       // We run the first rel here for two reasons:
@@ -122,10 +122,10 @@ public QueryResponse runQuery()
       //    is also sub-optimal as it would consume parallel query resources and potentially starve the system.
       //    Instead, we only return the headers from the first query and potentially exception out and fail the query
       //    if there are any response headers that come from subsequent queries that are correctness concerns
-      final QueryResponse queryResponse = ((DruidRel) rels.get(0)).runQuery();
+      final QueryResponse queryResponse = ((DruidRel) rels.get(0)).runQuery();
 
-      final List> firstAsList = Collections.singletonList(queryResponse.getResults());
-      final Iterable> theRestTransformed = FluentIterable
+      final List> firstAsList = Collections.singletonList(queryResponse.getResults());
+      final Iterable> theRestTransformed = FluentIterable
           .from(rels.subList(1, rels.size()))
           .transform(
               rel -> {
@@ -144,10 +144,10 @@ public QueryResponse runQuery()
               }
           );
 
-      final Iterable> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
+      final Iterable> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
 
       final Sequence returnSequence = Sequences.concat(recombinedSequences);
-      return new QueryResponse(
+      return new QueryResponse(
           limit > 0 ? returnSequence.limit(limit) : returnSequence,
           queryResponse.getResponseContext()
       );
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
index f045769ec93f..27a5462ba7c3 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
@@ -94,7 +94,7 @@ public NativeQueryMaker(
   }
 
   @Override
-  public QueryResponse runQuery(final DruidQuery druidQuery)
+  public QueryResponse runQuery(final DruidQuery druidQuery)
   {
     final Query query = druidQuery.getQuery();
 
@@ -173,7 +173,8 @@ private List findBaseDataSourceIntervals(Query query)
                              .orElseGet(query::getIntervals);
   }
 
-  private  QueryResponse execute(Query query, final List newFields, final List newTypes)
+  @SuppressWarnings("unchecked")
+  private  QueryResponse execute(Query query, final List newFields, final List newTypes)
   {
     Hook.QUERY_PLAN.run(query);
 
@@ -195,14 +196,19 @@ private  QueryResponse execute(Query query, final List newFields,
     // otherwise it won't yet be initialized. (A bummer, since ideally, we'd verify the toolChest exists and can do
     // array-based results before starting the query; but in practice we don't expect this to happen since we keep
     // tight control over which query types we generate in the SQL layer. They all support array-based results.)
-    final QueryResponse results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult);
-
-
-    return mapResultSequence(results, queryLifecycle.getToolChest(), query, newFields, newTypes);
+    final QueryResponse results = queryLifecycle.runSimple((Query) query, authenticationResult, authorizationResult);
+
+    return mapResultSequence(
+        results,
+        (QueryToolChest>) queryLifecycle.getToolChest(),
+        (Query) query,
+        newFields,
+        newTypes
+    );
   }
 
-  private  QueryResponse mapResultSequence(
-      final QueryResponse results,
+  private  QueryResponse mapResultSequence(
+      final QueryResponse results,
       final QueryToolChest> toolChest,
       final Query query,
       final List newFields,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
index 8acc02230cf1..8039d60c3e75 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
@@ -33,5 +33,5 @@ public interface QueryMaker
    * created for. The returned arrays match the row type given by {@link SqlEngine#resultTypeForSelect} or
    * {@link SqlEngine#resultTypeForInsert}, depending on the nature of the statement.
    */
-  QueryResponse runQuery(DruidQuery druidQuery);
+  QueryResponse runQuery(DruidQuery druidQuery);
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index db7f7d4ae732..dc4bfbcfb655 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -128,7 +128,7 @@ public Response doPost(
     try {
       Thread.currentThread().setName(StringUtils.format("sql[%s]", sqlQueryId));
       ResultSet resultSet = stmt.plan();
-      final QueryResponse response = resultSet.run();
+      final QueryResponse response = resultSet.run();
       final SqlRowTransformer rowTransformer = resultSet.createRowTransformer();
       final Yielder finalYielder = Yielders.each(response.getResults());
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 92d0a5773f0d..46d3e7fccd28 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -43,7 +43,7 @@
 import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
-import org.apache.druid.sql.calcite.planner.DruidPlanner;
+import org.apache.druid.sql.calcite.planner.IngestHandler;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.util.CalciteTests;
@@ -204,7 +204,7 @@ public void testInsertUsingColumnList()
   {
     testIngestionQuery()
         .sql("INSERT INTO dst (foo, bar) SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "INSERT with target column list is not supported.")
+        .expectValidationError(SqlPlanningException.class, "INSERT with a target column list is not supported.")
         .verify();
   }
 
@@ -226,7 +226,7 @@ public void testSelectFromSystemTable()
         .sql("INSERT INTO dst SELECT * FROM INFORMATION_SCHEMA.COLUMNS PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot query table [INFORMATION_SCHEMA.COLUMNS] with SQL engine 'ingestion-test'."
+            "Cannot query table INFORMATION_SCHEMA.COLUMNS with SQL engine 'ingestion-test'."
         )
         .verify();
   }
@@ -238,7 +238,7 @@ public void testInsertIntoSystemTable()
         .sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
+            "Cannot INSERT into INFORMATION_SCHEMA.COLUMNS because it is not a Druid datasource."
         )
         .verify();
   }
@@ -250,7 +250,7 @@ public void testInsertIntoView()
         .sql("INSERT INTO view.aview SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot INSERT into [view.aview] because it is not a Druid datasource (schema = druid)."
+            "Cannot INSERT into view.aview because it is not a Druid datasource."
         )
         .verify();
   }
@@ -280,7 +280,7 @@ public void testInsertIntoNonexistentSchema()
         .sql("INSERT INTO nonexistent.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot INSERT into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
+            "Cannot INSERT into nonexistent.dst because it is not a Druid datasource."
         )
         .verify();
   }
@@ -435,7 +435,7 @@ public void testInsertWithoutPartitionedByWithClusteredBy()
         )
         .expectValidationError(
             SqlPlanningException.class,
-            "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"
+            "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"
         )
         .verify();
   }
@@ -517,7 +517,7 @@ public void testInsertWithClusteredByAndOrderBy()
     }
     catch (SqlPlanningException e) {
       Assert.assertEquals(
-          "Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
+          "Cannot have ORDER BY on an INSERT statement, use CLUSTERED BY instead.",
           e.getMessage()
       );
     }
@@ -561,7 +561,7 @@ public void testInsertWithOrderBy()
     }
     catch (SqlPlanningException e) {
       Assert.assertEquals(
-          "Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
+          "Cannot have ORDER BY on an INSERT statement, use CLUSTERED BY instead.",
           e.getMessage()
       );
     }
@@ -796,7 +796,7 @@ public void testInsertWithUnnamedColumnInSelectStatement()
         .sql("INSERT INTO t SELECT dim1, dim2 || '-lol' FROM foo PARTITIONED BY ALL")
         .expectValidationError(
             SqlPlanningException.class,
-            DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
+            IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
         )
         .verify();
   }
@@ -808,7 +808,7 @@ public void testInsertWithInvalidColumnNameInIngest()
         .sql("INSERT INTO t SELECT __time, dim1 AS EXPR$0 FROM foo PARTITIONED BY ALL")
         .expectValidationError(
             SqlPlanningException.class,
-            DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
+            IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
         )
         .verify();
   }
@@ -822,7 +822,7 @@ public void testInsertWithUnnamedColumnInNestedSelectStatement()
              + "(SELECT __time, LOWER(dim1) FROM foo) PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
+            IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
         )
         .verify();
   }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
index a244abef6ab4..eb31a17fc907 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
@@ -255,7 +255,7 @@ public void testReplaceWithOrderBy()
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1 PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead.")
+        .expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE statement, use CLUSTERED BY instead.")
         .verify();
   }
 
@@ -390,7 +390,7 @@ public void testReplaceUsingColumnList()
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst (foo, bar) OVERWRITE ALL SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "REPLACE with target column list is not supported.")
+        .expectValidationError(SqlPlanningException.class, "REPLACE with a target column list is not supported.")
         .verify();
   }
 
@@ -408,7 +408,7 @@ public void testReplaceWithoutPartitionedByWithClusteredBy()
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst OVERWRITE ALL SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo CLUSTERED BY dim1")
-        .expectValidationError(SqlPlanningException.class, "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause")
+        .expectValidationError(SqlPlanningException.class, "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause")
         .verify();
   }
 
@@ -417,7 +417,7 @@ public void testReplaceWithoutOverwriteClause()
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.")
+        .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.")
         .verify();
   }
 
@@ -426,7 +426,7 @@ public void testReplaceWithoutCompleteOverwriteClause()
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst OVERWRITE SELECT * FROM foo PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.")
+        .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.")
         .verify();
   }
 
@@ -437,7 +437,7 @@ public void testReplaceIntoSystemTable()
         .sql("REPLACE INTO INFORMATION_SCHEMA.COLUMNS OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot REPLACE into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
+            "Cannot REPLACE into INFORMATION_SCHEMA.COLUMNS because it is not a Druid datasource."
         )
         .verify();
   }
@@ -449,7 +449,7 @@ public void testReplaceIntoView()
         .sql("REPLACE INTO view.aview OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot REPLACE into [view.aview] because it is not a Druid datasource (schema = druid)."
+            "Cannot REPLACE into view.aview because it is not a Druid datasource."
         )
         .verify();
   }
@@ -479,7 +479,7 @@ public void testReplaceIntoNonexistentSchema()
         .sql("REPLACE INTO nonexistent.dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot REPLACE into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
+            "Cannot REPLACE into nonexistent.dst because it is not a Druid datasource."
         )
         .verify();
   }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
index 22c004947ab6..8562ce29bec5 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
@@ -45,7 +45,7 @@ public TestInsertQueryMaker(
   }
 
   @Override
-  public QueryResponse runQuery(final DruidQuery druidQuery)
+  public QueryResponse runQuery(final DruidQuery druidQuery)
   {
     // Don't actually execute anything, but do record information that tests will check for.
 
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index d1bb098b8490..88d754e0fa2b 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -2077,7 +2077,7 @@ public ResultSet createResultSet(PlannerResult plannerResult)
       return new ResultSet(plannerResult)
       {
         @Override
-        public QueryResponse run()
+        public QueryResponse run()
         {
           final Function, Sequence> sequenceMapFn =
               Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
@@ -2085,12 +2085,12 @@ public QueryResponse run()
           final NonnullPair executeLatch = executeLatchSupplier.get();
           if (executeLatch != null) {
             if (executeLatch.rhs) {
-              final QueryResponse resp = super.run();
+              final QueryResponse resp = super.run();
               Sequence sequence = sequenceMapFn.apply(resp.getResults());
               executeLatch.lhs.countDown();
               final ResponseContext respContext = resp.getResponseContext();
               respContext.merge(responseContextSupplier.get());
-              return new QueryResponse(sequence, respContext);
+              return new QueryResponse<>(sequence, respContext);
             } else {
               try {
                 if (!executeLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
@@ -2103,11 +2103,11 @@ public QueryResponse run()
             }
           }
 
-          final QueryResponse resp = super.run();
+          final QueryResponse resp = super.run();
           Sequence sequence = sequenceMapFn.apply(resp.getResults());
           final ResponseContext respContext = resp.getResponseContext();
           respContext.merge(responseContextSupplier.get());
-          return new QueryResponse(sequence, respContext);
+          return new QueryResponse<>(sequence, respContext);
         }
       };
     }