diff --git a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java index f8b9dd03b726..1bc1821b94e7 100644 --- a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java +++ b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java @@ -70,24 +70,24 @@ public static > T getEnumIfPresent(final Class enumClass, f } /** - * If first argument is not null, return it, else return the other argument. Sort of like - * {@link com.google.common.base.Objects#firstNonNull(T, T)} except will not explode if both arguments are - * null. + * If first argument is not null, return it, else return the other argument. + * Sort of like + * {@link static com.google.common.base.Objects#firstNonNull(T, T)} except + * will not explode if both arguments are null. */ @Nullable public static T firstNonNull(@Nullable T arg1, @Nullable T arg2) { - if (arg1 == null) { - return arg2; - } - return arg1; + return arg1 == null ? arg2 : arg1; } /** - * Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture} + * Cancel futures manually, because sometime we can't cancel all futures in + * {@code com.google.common.util.concurrent.Futures.CombinedFuture} * automatically. Especially when we call - * {@link static ListenableFuture> com.google.common.util.concurrent.Futures#allAsList(Iterable> futures)} to create a batch of - * future. + * {@link static ListenableFuture> + * com.google.common.util.concurrent.Futures#allAsList(Iterable> futures)} + * to create a batch of futures. * @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITJdbcQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITJdbcQueryTest.java index 6e3944029e15..fe95c27bc7c6 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITJdbcQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITJdbcQueryTest.java @@ -211,7 +211,7 @@ public void testJdbcPrepareStatementQuery() } } - @Test(expectedExceptions = AvaticaSqlException.class, expectedExceptionsMessageRegExp = ".* Parameter at position\\[0] is not bound") + @Test(expectedExceptions = AvaticaSqlException.class, expectedExceptionsMessageRegExp = ".* Parameter at position \\[0] is not bound") public void testJdbcPrepareStatementQueryMissingParameters() throws SQLException { for (String url : connections) { diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java index 9faca5c600f2..1ce01a36f987 100644 --- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java +++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java @@ -55,7 +55,6 @@ import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.planner.PlannerResult; import org.apache.druid.sql.calcite.planner.PrepareResult; -import org.apache.druid.sql.calcite.planner.ValidationResult; import org.apache.druid.sql.http.SqlParameter; import org.apache.druid.sql.http.SqlQuery; @@ -69,6 +68,7 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -106,13 +106,18 @@ public class SqlLifecycle @GuardedBy("stateLock") private State state = State.NEW; - // init during intialize + // init during initialize private String sql; private QueryContext queryContext; private List parameters; + // init during plan + /** + * The Druid planner follows the SQL statement through the lifecycle. + * The planner's state is start --> validate --> (prepare | plan). + */ + private DruidPlanner planner; private PlannerContext plannerContext; - private ValidationResult validationResult; private PrepareResult prepareResult; private PlannerResult plannerResult; @@ -170,7 +175,7 @@ private String sqlQueryId() } /** - * Assign dynamic parameters to be used to substitute values during query exection. This can be performed at any + * Assign dynamic parameters to be used to substitute values during query execution. This can be performed at any * part of the lifecycle. */ public void setParameters(List parameters) @@ -196,14 +201,13 @@ public void validateAndAuthorize(AuthenticationResult authenticationResult) } transition(State.INITIALIZED, State.AUTHORIZING); validate(authenticationResult); - Access access = doAuthorize( + doAuthorize(resourceActions -> AuthorizationUtils.authorizeAllResourceActions( authenticationResult, - validationResult.getResourceActions(), + resourceActions, plannerFactory.getAuthorizerMapper() ) ); - checkAccess(access); } /** @@ -218,26 +222,29 @@ public void validateAndAuthorize(HttpServletRequest req) transition(State.INITIALIZED, State.AUTHORIZING); AuthenticationResult authResult = AuthorizationUtils.authenticationResultFromRequest(req); validate(authResult); - Access access = doAuthorize( + doAuthorize(resourceActions -> AuthorizationUtils.authorizeAllResourceActions( req, - validationResult.getResourceActions(), + resourceActions, plannerFactory.getAuthorizerMapper() ) ); - checkAccess(access); } - private ValidationResult validate(AuthenticationResult authenticationResult) + /** + * Perform the validation step on the Druid planner, leaving the planner + * ready to perform either prepare or plan. + */ + private void validate(AuthenticationResult authenticationResult) { - try (DruidPlanner planner = plannerFactory.createPlanner(sql, queryContext)) { + try { + planner = plannerFactory.createPlanner(sql, queryContext); // set planner context for logs/metrics in case something explodes early - this.plannerContext = planner.getPlannerContext(); - this.plannerContext.setAuthenticationResult(authenticationResult); + plannerContext = planner.getPlannerContext(); + plannerContext.setAuthenticationResult(authenticationResult); // set parameters on planner context, if parameters have already been set - this.plannerContext.setParameters(parameters); - this.validationResult = planner.validate(authConfig.authorizeQueryContextParams()); - return validationResult; + plannerContext.setParameters(parameters); + planner.validate(); } // we can't collapse catch clauses since SqlPlanningException has type-sensitive constructors. catch (SqlParseException e) { @@ -248,46 +255,43 @@ private ValidationResult validate(AuthenticationResult authenticationResult) } } - private Access doAuthorize(final Access authorizationResult) + private void doAuthorize(Function, Access> authorizer) { + Access authorizationResult = planner.authorize( + authorizer, + authConfig.authorizeQueryContextParams()); if (!authorizationResult.isAllowed()) { // Not authorized; go straight to Jail, do not pass Go. transition(State.AUTHORIZING, State.UNAUTHORIZED); } else { transition(State.AUTHORIZING, State.AUTHORIZED); } - return authorizationResult; - } - - private void checkAccess(Access access) - { - plannerContext.setAuthorizationResult(access); - if (!access.isAllowed()) { - throw new ForbiddenException(access.toString()); + if (!authorizationResult.isAllowed()) { + throw new ForbiddenException(authorizationResult.toString()); } } /** - * Prepare the query lifecycle for execution, without completely planning into something that is executable, but - * including some initial parsing and validation and any dyanmic parameter type resolution, to support prepared + * Prepare the query lifecycle for execution, without completely planning into + * something that is executable, but including some initial parsing and + * validation and any dynamic parameter type resolution, to support prepared * statements via JDBC. + * + * The planner must have already performed the validation step: the planner + * state is reused here. */ - public PrepareResult prepare() throws RelConversionException + public PrepareResult prepare() { synchronized (stateLock) { if (state != State.AUTHORIZED) { - throw new ISE("Cannot prepare because current state[%s] is not [%s].", state, State.AUTHORIZED); + throw new ISE("Cannot prepare because current state [%s] is not [%s].", state, State.AUTHORIZED); } } Preconditions.checkNotNull(plannerContext, "Cannot prepare, plannerContext is null"); - try (DruidPlanner planner = plannerFactory.createPlannerWithContext(plannerContext)) { + try { this.prepareResult = planner.prepare(); return prepareResult; } - // we can't collapse catch clauses since SqlPlanningException has type-sensitive constructors. - catch (SqlParseException e) { - throw new SqlPlanningException(e); - } catch (ValidationException e) { throw new SqlPlanningException(e); } @@ -296,22 +300,27 @@ public PrepareResult prepare() throws RelConversionException /** * Plan the query to enable execution. * - * If successful, the lifecycle will first transition from {@link State#AUTHORIZED} to {@link State#PLANNED}. + * The planner must have already performed the validation step: the planner + * state is reused here. + * + * If successful, the lifecycle will first transition from + * {@link State#AUTHORIZED} to {@link State#PLANNED}. */ public void plan() throws RelConversionException { transition(State.AUTHORIZED, State.PLANNED); Preconditions.checkNotNull(plannerContext, "Cannot plan, plannerContext is null"); - try (DruidPlanner planner = plannerFactory.createPlannerWithContext(plannerContext)) { + try { this.plannerResult = planner.plan(); } - // we can't collapse catch clauses since SqlPlanningException has type-sensitive constructors. - catch (SqlParseException e) { - throw new SqlPlanningException(e); - } catch (ValidationException e) { throw new SqlPlanningException(e); } + finally { + // Done with the planner, close it. + planner.close(); + planner = null; + } } /** @@ -376,20 +385,22 @@ public void after(boolean isDone, Throwable thrown) }); } - @VisibleForTesting - public ValidationResult runAnalyzeResources(AuthenticationResult authenticationResult) + public Set runAnalyzeResources(AuthenticationResult authenticationResult) { - return validate(authenticationResult); + validate(authenticationResult); + return getRequiredResourceActions(); } public Set getRequiredResourceActions() { - return Preconditions.checkNotNull(validationResult, "validationResult").getResourceActions(); + return planner == null + ? null + : planner.resourceActions(authConfig.authorizeQueryContextParams()); } /** - * Cancel all native queries associated to this lifecycle. + * Cancel all native queries associated with this lifecycle. * * This method is thread-safe. */ @@ -405,7 +416,7 @@ public void cancel() final CopyOnWriteArrayList nativeQueryIds = plannerContext.getNativeQueryIds(); for (String nativeQueryId : nativeQueryIds) { - log.debug("canceling native query [%s]", nativeQueryId); + log.debug("Canceling native query [%s]", nativeQueryId); queryScheduler.cancelQuery(nativeQueryId); } } @@ -440,6 +451,11 @@ public void finalizeStateAndEmitLogsAndMetrics( } } + if (planner != null) { + planner.close(); + planner = null; + } + final boolean success = e == null; final long queryTimeNs = System.nanoTime() - startNs; @@ -449,10 +465,11 @@ public void finalizeStateAndEmitLogsAndMetrics( metricBuilder.setDimension("id", plannerContext.getSqlQueryId()); metricBuilder.setDimension("nativeQueryIds", plannerContext.getNativeQueryIds().toString()); } - if (validationResult != null) { + Set actions = getRequiredResourceActions(); + if (actions != null) { metricBuilder.setDimension( "dataSource", - validationResult.getResourceActions() + actions .stream() .map(action -> action.getResource().getName()) .collect(Collectors.toList()) @@ -527,7 +544,7 @@ private void transition(final State from, final State to) } if (state != from) { throw new ISE( - "Cannot transition from[%s] to[%s] because current state[%s] is not [%s].", + "Cannot transition from [%s] to [%s] because current state [%s] is not [%s].", from, to, state, diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java index 2cd277f1c580..679f636f957d 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java @@ -89,7 +89,7 @@ public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory) } if (statements.size() >= maxStatements) { - throw DruidMeta.logFailure(new ISE("Too many open statements, limit is[%,d]", maxStatements)); + throw DruidMeta.logFailure(new ISE("Too many open statements, limit is [%,d]", maxStatements)); } @SuppressWarnings("GuardedBy") @@ -100,14 +100,14 @@ public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory) sqlLifecycleFactory.factorize(), () -> { // onClose function for the statement - LOG.debug("Connection[%s] closed statement[%s].", connectionId, statementId); + LOG.debug("Connection [%s] closed statement [%s].", connectionId, statementId); // statements will be accessed unsynchronized to avoid deadlock statements.remove(statementId); } ); statements.put(statementId, statement); - LOG.debug("Connection[%s] opened statement[%s].", connectionId, statementId); + LOG.debug("Connection [%s] opened statement [%s].", connectionId, statementId); return statement; } } @@ -145,11 +145,11 @@ public void close() statement.close(); } catch (Exception e) { - LOG.warn("Connection[%s] failed to close statement[%s]!", connectionId, statement.getStatementId()); + LOG.warn("Connection [%s] failed to close statement [%s]!", connectionId, statement.getStatementId()); } } - LOG.debug("Connection[%s] closed.", connectionId); + LOG.debug("Connection [%s] closed.", connectionId); open = false; } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/BaseStatementHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/BaseStatementHandler.java new file mode 100644 index 000000000000..cfc636a7e9da --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/BaseStatementHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.tools.ValidationException; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.planner.DruidPlanner.SqlStatementHandler; + +import java.util.HashSet; +import java.util.Set; + +/** + * SQL-statement-specific behavior. Each statement follows the same + * lifecycle: analyze, followed by either prepare or plan (execute). + */ +abstract class BaseStatementHandler implements SqlStatementHandler +{ + protected final HandlerContext handlerContext; + protected final Set resourceActions = new HashSet<>(); + + protected BaseStatementHandler(HandlerContext handlerContext) + { + this.handlerContext = handlerContext; + } + + protected abstract SqlNode sqlNode(); + + protected SqlNode validateNode(SqlNode node) throws ValidationException + { + try { + return handlerContext.planner().validate(node); + } + catch (RuntimeException e) { + throw new ValidationException(e); + } + } + + /** + * Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any + * {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral} + * replacement + * @throws ValidationException + */ + protected SqlNode rewriteDynamicParameters(SqlNode parsed) throws ValidationException + { + if (handlerContext.parameters().isEmpty()) { + return parsed; + } + try { + // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any + // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral} + // replacement. + // + // Parameter replacement is done only if the client provides parameter values. + // If this is a PREPARE-only, then there will be no values even if the statement contains + // parameters. If this is a PLAN, then we'll catch later the case that the statement + // contains parameters, but no values were provided. + return parsed.accept( + new SqlParameterizerShuttle( + handlerContext.plannerContext())); + } + catch (RuntimeException e) { + throw new ValidationException(e); + } + } + + @Override + public Set resourceActions() + { + return resourceActions; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java new file mode 100644 index 000000000000..440e4e80c92d --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.config.CalciteSystemProperty; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable.ViewExpander; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.VolcanoPlanner; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.metadata.CachingRelMetadataProvider; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexExecutor; +import org.apache.calcite.runtime.Hook; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql2rel.RelDecorrelator; +import org.apache.calcite.sql2rel.SqlRexConvertletTable; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.Program; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.ValidationException; +import org.apache.calcite.util.Pair; + +import javax.annotation.Nullable; + +import java.io.Reader; +import java.util.List; +import java.util.Properties; + +/** + * Calcite planner. Clone of Calcite's + * {@link org.apache.calcite.prepare.PlannerImpl}, as of version 1.21, + * but with the validator made accessible, and with the minimum of formatting + * changes needed to pass Druid's static checks. Note that the resulting code + * is more Calcite-like than Druid-like. There seemed no value in restructuring + * the code just to be more Druid-like. + */ +public class CalcitePlanner implements Planner, ViewExpander +{ + private final SqlOperatorTable operatorTable; + private final ImmutableList programs; + private final FrameworkConfig frameworkConfig; + private final Context context; + private final CalciteConnectionConfig connectionConfig; + + /** Holds the trait definitions to be registered with planner. May be null. */ + private @Nullable final List traitDefs; + + private final SqlParser.Config parserConfig; + private final SqlToRelConverter.Config sqlToRelConverterConfig; + private final SqlRexConvertletTable convertletTable; + + private State state; + + // set in STATE_2_READY + private SchemaPlus defaultSchema; + private JavaTypeFactory typeFactory; + private RelOptPlanner planner; + private RexExecutor executor; + + // set in STATE_4_VALIDATE + private SqlValidator validator; + private SqlNode validatedSqlNode; + + // set in STATE_5_CONVERT + private RelRoot root; + + public CalcitePlanner(FrameworkConfig config) + { + this.frameworkConfig = config; + this.defaultSchema = config.getDefaultSchema(); + this.operatorTable = config.getOperatorTable(); + this.programs = config.getPrograms(); + this.parserConfig = config.getParserConfig(); + this.sqlToRelConverterConfig = config.getSqlToRelConverterConfig(); + this.state = State.STATE_0_CLOSED; + this.traitDefs = config.getTraitDefs(); + this.convertletTable = config.getConvertletTable(); + this.executor = config.getExecutor(); + this.context = config.getContext(); + this.connectionConfig = connConfig(); + reset(); + } + + private CalciteConnectionConfig connConfig() + { + CalciteConnectionConfig unwrapped = context.unwrap(CalciteConnectionConfig.class); + if (unwrapped != null) { + return unwrapped; + } + Properties properties = new Properties(); + properties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), + String.valueOf(parserConfig.caseSensitive())); + properties.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), + String.valueOf(frameworkConfig.getParserConfig().conformance())); + return new CalciteConnectionConfigImpl(properties); + } + + /** Makes sure that the state is at least the given state. */ + private void ensure(State state) + { + if (state == this.state) { + return; + } + if (state.ordinal() < this.state.ordinal()) { + throw new IllegalArgumentException("cannot move to " + state + " from " + + this.state); + } + state.from(this); + } + + @Override + public RelTraitSet getEmptyTraitSet() + { + return planner.emptyTraitSet(); + } + + public FrameworkConfig frameworkConfig() + { + return frameworkConfig; + } + + @Override + public void close() + { + typeFactory = null; + state = State.STATE_0_CLOSED; + } + + @Override + public void reset() + { + ensure(State.STATE_0_CLOSED); + state = State.STATE_1_RESET; + } + + private void ready() + { + switch (state) { + case STATE_0_CLOSED: + reset(); + break; + default: + } + ensure(State.STATE_1_RESET); + + RelDataTypeSystem typeSystem = + connectionConfig.typeSystem(RelDataTypeSystem.class, + RelDataTypeSystem.DEFAULT); + typeFactory = new JavaTypeFactoryImpl(typeSystem); + planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), context); + RelOptUtil.registerDefaultRules(planner, + connectionConfig.materializationsEnabled(), + Hook.ENABLE_BINDABLE.get(false)); + planner.setExecutor(executor); + + state = State.STATE_2_READY; + + // If user specify own traitDef, instead of default default trait, + // register the trait def specified in traitDefs. + if (this.traitDefs == null) { + planner.addRelTraitDef(ConventionTraitDef.INSTANCE); + if (CalciteSystemProperty.ENABLE_COLLATION_TRAIT.value()) { + planner.addRelTraitDef(RelCollationTraitDef.INSTANCE); + } + } else { + for (RelTraitDef def : this.traitDefs) { + planner.addRelTraitDef(def); + } + } + } + + @Override + public SqlNode parse(final Reader reader) throws SqlParseException + { + switch (state) { + case STATE_0_CLOSED: + case STATE_1_RESET: + ready(); + break; + default: + } + ensure(State.STATE_2_READY); + SqlParser parser = SqlParser.create(reader, parserConfig); + SqlNode sqlNode = parser.parseStmt(); + state = State.STATE_3_PARSED; + return sqlNode; + } + + @Override + public SqlNode validate(SqlNode sqlNode) throws ValidationException + { + ensure(State.STATE_3_PARSED); + final SqlConformance conformance = conformance(); + final CalciteCatalogReader catalogReader = createCatalogReader(); + this.validator = + new DruidSqlValidator(operatorTable, catalogReader, typeFactory, + conformance); + this.validator.setIdentifierExpansion(true); + try { + validatedSqlNode = validator.validate(sqlNode); + } + catch (RuntimeException e) { + throw new ValidationException(e); + } + state = State.STATE_4_VALIDATED; + return validatedSqlNode; + } + + public SqlValidator getValidator() + { + return validator; + } + + private SqlConformance conformance() + { + return connectionConfig.conformance(); + } + + @Override + public Pair validateAndGetType(SqlNode sqlNode) + throws ValidationException + { + final SqlNode validatedNode = this.validate(sqlNode); + final RelDataType type = + this.validator.getValidatedNodeType(validatedNode); + return Pair.of(validatedNode, type); + } + + @Override + public final RelNode convert(SqlNode sql) + { + return rel(sql).rel; + } + + @Override + public RelRoot rel(SqlNode sql) + { + ensure(State.STATE_4_VALIDATED); + assert validatedSqlNode != null; + final RexBuilder rexBuilder = createRexBuilder(); + final RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder); + final SqlToRelConverter.Config config = SqlToRelConverter.configBuilder() + .withConfig(sqlToRelConverterConfig) + .withTrimUnusedFields(false) + .withConvertTableAccess(false) + .build(); + final SqlToRelConverter sqlToRelConverter = + new SqlToRelConverter(this, validator, + createCatalogReader(), cluster, convertletTable, config); + root = + sqlToRelConverter.convertQuery(validatedSqlNode, false, true); + root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)); + final RelBuilder relBuilder = + config.getRelBuilderFactory().create(cluster, null); + root = root.withRel( + RelDecorrelator.decorrelateQuery(root.rel, relBuilder)); + state = State.STATE_5_CONVERTED; + return root; + } + + @Override public RelRoot expandView( + RelDataType rowType, + String queryString, + List schemaPath, + List viewPath) + { + if (planner == null) { + ready(); + } + SqlParser parser = SqlParser.create(queryString, parserConfig); + SqlNode sqlNode; + try { + sqlNode = parser.parseQuery(); + } + catch (SqlParseException e) { + throw new RuntimeException("parse failed", e); + } + + final SqlConformance conformance = conformance(); + final CalciteCatalogReader catalogReader = + createCatalogReader().withSchemaPath(schemaPath); + final SqlValidator validator = + new DruidSqlValidator(operatorTable, catalogReader, typeFactory, + conformance); + validator.setIdentifierExpansion(true); + + final RexBuilder rexBuilder = createRexBuilder(); + final RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder); + final SqlToRelConverter.Config config = SqlToRelConverter + .configBuilder() + .withConfig(sqlToRelConverterConfig) + .withTrimUnusedFields(false) + .withConvertTableAccess(false) + .build(); + final SqlToRelConverter sqlToRelConverter = + new SqlToRelConverter(this, validator, + catalogReader, cluster, convertletTable, config); + + final RelRoot root = + sqlToRelConverter.convertQuery(sqlNode, true, false); + final RelRoot root2 = + root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)); + final RelBuilder relBuilder = + config.getRelBuilderFactory().create(cluster, null); + return root2.withRel( + RelDecorrelator.decorrelateQuery(root.rel, relBuilder)); + } + + // CalciteCatalogReader is stateless; no need to store one + private CalciteCatalogReader createCatalogReader() + { + final SchemaPlus rootSchema = rootSchema(defaultSchema); + + return new CalciteCatalogReader( + CalciteSchema.from(rootSchema), + CalciteSchema.from(defaultSchema).path(null), + typeFactory, connectionConfig); + } + + private static SchemaPlus rootSchema(SchemaPlus schema) + { + for (;;) { + if (schema.getParentSchema() == null) { + return schema; + } + schema = schema.getParentSchema(); + } + } + + // RexBuilder is stateless; no need to store one + private RexBuilder createRexBuilder() + { + return new RexBuilder(typeFactory); + } + + @Override + public JavaTypeFactory getTypeFactory() + { + return typeFactory; + } + + @Override + public RelNode transform( + int ruleSetIndex, + RelTraitSet requiredOutputTraits, + RelNode rel + ) + { + ensure(State.STATE_5_CONVERTED); + rel.getCluster().setMetadataProvider( + new CachingRelMetadataProvider( + rel.getCluster().getMetadataProvider(), + rel.getCluster().getPlanner())); + Program program = programs.get(ruleSetIndex); + return program.run(planner, rel, requiredOutputTraits, ImmutableList.of(), + ImmutableList.of()); + } + + /** Stage of a statement in the query-preparation lifecycle. */ + private enum State + { + STATE_0_CLOSED { + @Override void from(CalcitePlanner planner) + { + planner.close(); + } + }, + STATE_1_RESET { + @Override void from(CalcitePlanner planner) + { + planner.ensure(STATE_0_CLOSED); + planner.reset(); + } + }, + STATE_2_READY { + @Override void from(CalcitePlanner planner) + { + STATE_1_RESET.from(planner); + planner.ready(); + } + }, + STATE_3_PARSED, + STATE_4_VALIDATED, + STATE_5_CONVERTED; + + /** Moves planner's state to this state. This must be a higher state. */ + void from(CalcitePlanner planner) + { + throw new IllegalArgumentException("cannot move from " + planner.state + + " to " + this); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index ff139c7153f8..0bd946e67ef3 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -19,104 +19,66 @@ package org.apache.druid.sql.calcite.planner; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import org.apache.calcite.DataContext; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.config.CalciteConnectionConfig; -import org.apache.calcite.config.CalciteConnectionConfigImpl; -import org.apache.calcite.config.CalciteConnectionProperty; -import org.apache.calcite.interpreter.BindableConvention; -import org.apache.calcite.interpreter.BindableRel; -import org.apache.calcite.interpreter.Bindables; -import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.prepare.CalciteCatalogReader; -import org.apache.calcite.prepare.Prepare; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelRoot; -import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rel.logical.LogicalSort; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlExplain; -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlValidator; -import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.Planner; -import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; -import org.apache.calcite.util.Pair; -import org.apache.druid.common.utils.IdUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.query.Query; -import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; -import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; -import org.apache.druid.sql.calcite.rel.DruidConvention; -import org.apache.druid.sql.calcite.rel.DruidQuery; -import org.apache.druid.sql.calcite.rel.DruidRel; -import org.apache.druid.sql.calcite.rel.DruidUnionRel; -import org.apache.druid.sql.calcite.run.QueryMaker; +import org.apache.druid.sql.calcite.planner.IngestHandler.InsertHandler; +import org.apache.druid.sql.calcite.planner.IngestHandler.ReplaceHandler; import org.apache.druid.sql.calcite.run.QueryMakerFactory; -import org.apache.druid.utils.Throwables; -import org.joda.time.DateTimeZone; -import javax.annotation.Nullable; import java.io.Closeable; -import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; import java.util.Set; +import java.util.function.Function; import java.util.regex.Pattern; -import java.util.stream.Collectors; public class DruidPlanner implements Closeable { - private static final EmittingLogger log = new EmittingLogger(DruidPlanner.class); - private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE); + public enum State + { + START, VALIDATED, PREPARED, PLANNED + } + /** + * SQL-statement-specific behavior. Each statement follows the same + * lifecycle: analyze, followed by either prepare or plan (execute). + */ + interface SqlStatementHandler + { + void analyze() throws ValidationException; + Set resourceActions(); + PrepareResult prepare() throws ValidationException; + PlannerResult plan() throws ValidationException; + } - private final FrameworkConfig frameworkConfig; - private final Planner planner; - private final PlannerContext plannerContext; - private final QueryMakerFactory queryMakerFactory; + static final EmittingLogger log = new EmittingLogger(DruidPlanner.class); + static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE); + @VisibleForTesting + public static final String UNNAMED_INGESTION_COLUMN_ERROR = + "Cannot ingest expressions that do not have an alias " + + "or columns with names like EXPR$[digit].\n" + + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as " + + "\"func(X) as myColumn\""; - private RexBuilder rexBuilder; + private final CalcitePlanner planner; + private final PlannerContext plannerContext; + final QueryMakerFactory queryMakerFactory; + private State state = State.START; + private boolean authorized; + private SqlStatementHandler handler; DruidPlanner( final FrameworkConfig frameworkConfig, @@ -124,805 +86,127 @@ public class DruidPlanner implements Closeable final QueryMakerFactory queryMakerFactory ) { - this.frameworkConfig = frameworkConfig; - this.planner = Frameworks.getPlanner(frameworkConfig); + this.planner = new CalcitePlanner(frameworkConfig); this.plannerContext = plannerContext; this.queryMakerFactory = queryMakerFactory; } - /** - * Validates a SQL query and populates {@link PlannerContext#getResourceActions()}. - * - * @return set of {@link Resource} corresponding to any Druid datasources or views which are taking part in the query. - */ - public ValidationResult validate(boolean authorizeContextParams) throws SqlParseException, ValidationException + private SqlStatementHandler createHandler(SqlNode root) throws ValidationException { - resetPlanner(); - final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()), plannerContext.getTimeZone()); - final SqlValidator validator = getValidator(); - final SqlNode validatedQueryNode; - - try { - validatedQueryNode = validator.validate(rewriteDynamicParameters(parsed.getQueryNode())); - } - catch (RuntimeException e) { - throw new ValidationException(e); - } - - SqlResourceCollectorShuttle resourceCollectorShuttle = new SqlResourceCollectorShuttle(validator, plannerContext); - validatedQueryNode.accept(resourceCollectorShuttle); - - final Set resourceActions = new HashSet<>(resourceCollectorShuttle.getResourceActions()); - - if (parsed.getInsertOrReplace() != null) { - final String targetDataSource = validateAndGetDataSourceForIngest(parsed.getInsertOrReplace()); - resourceActions.add(new ResourceAction(new Resource(targetDataSource, ResourceType.DATASOURCE), Action.WRITE)); - } - if (authorizeContextParams) { - plannerContext.getQueryContext().getUserParams().keySet().forEach(contextParam -> resourceActions.add( - new ResourceAction(new Resource(contextParam, ResourceType.QUERY_CONTEXT), Action.WRITE) - )); + HandlerContext handlerContext = new HandlerContext(plannerContext, planner, queryMakerFactory); + SqlExplain explain = null; + if (root.getKind() == SqlKind.EXPLAIN) { + explain = (SqlExplain) root; + root = explain.getExplicandum(); } - plannerContext.setResourceActions(resourceActions); - return new ValidationResult(resourceActions); - } - - /** - * Prepare an SQL query for execution, including some initial parsing and validation and any dynamic parameter type - * resolution, to support prepared statements via JDBC. - * - * In some future this could perhaps re-use some of the work done by {@link #validate(boolean)} - * instead of repeating it, but that day is not today. - */ - public PrepareResult prepare() throws SqlParseException, ValidationException, RelConversionException - { - resetPlanner(); - - final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()), plannerContext.getTimeZone()); - final SqlNode validatedQueryNode = planner.validate(parsed.getQueryNode()); - final RelRoot rootQueryRel = planner.rel(validatedQueryNode); - - final SqlValidator validator = getValidator(); - final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory(); - final RelDataType parameterTypes = validator.getParameterRowType(validator.validate(validatedQueryNode)); - final RelDataType returnedRowType; - - if (parsed.getExplainNode() != null) { - returnedRowType = getExplainStructType(typeFactory); - } else { - returnedRowType = buildQueryMaker(rootQueryRel, parsed.getInsertOrReplace()).getResultType(); - } - - return new PrepareResult(returnedRowType, parameterTypes); - } - - /** - * Plan an SQL query for execution, returning a {@link PlannerResult} which can be used to actually execute the query. - * - * Ideally, the query can be planned into a native Druid query, using {@link #planWithDruidConvention}, but will - * fall-back to {@link #planWithBindableConvention} if this is not possible. - * - * In some future this could perhaps re-use some of the work done by {@link #validate(boolean)} - * instead of repeating it, but that day is not today. - */ - public PlannerResult plan() throws SqlParseException, ValidationException, RelConversionException - { - resetPlanner(); - - final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()), plannerContext.getTimeZone()); - - try { - if (parsed.getIngestionGranularity() != null) { - plannerContext.getQueryContext().addSystemParam( - DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, - plannerContext.getJsonMapper().writeValueAsString(parsed.getIngestionGranularity()) - ); + if (root.getKind() == SqlKind.INSERT) { + if (root instanceof DruidSqlInsert) { + return new InsertHandler(handlerContext, (DruidSqlInsert) root, explain); + } else if (root instanceof DruidSqlReplace) { + return new ReplaceHandler(handlerContext, (DruidSqlReplace) root, plannerContext.getTimeZone(), explain); } } - catch (JsonProcessingException e) { - throw new ValidationException("Unable to serialize partition granularity."); - } - if (parsed.getReplaceIntervals() != null) { - plannerContext.getQueryContext().addSystemParam( - DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS, - String.join(",", parsed.getReplaceIntervals()) - ); + if (root.isA(SqlKind.QUERY)) { + return new SelectHandler(handlerContext, root, explain); } - // the planner's type factory is not available until after parsing - this.rexBuilder = new RexBuilder(planner.getTypeFactory()); - final SqlNode parameterizedQueryNode = rewriteDynamicParameters(parsed.getQueryNode()); - final SqlNode validatedQueryNode = planner.validate(parameterizedQueryNode); - final RelRoot rootQueryRel = planner.rel(validatedQueryNode); - - try { - return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(), parsed.getInsertOrReplace()); - } - catch (Exception e) { - Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class); - if (null == cannotPlanException) { - // Not a CannotPlanningException, rethrow without trying with bindable - throw e; - } - - // If there isn't any ingestion clause, then we should try again with BINDABLE convention. And return without - // any error, if it is plannable by the bindable convention - if (parsed.getInsertOrReplace() == null) { - // Try again with BINDABLE convention. Used for querying Values and metadata tables. - try { - return planWithBindableConvention(rootQueryRel, parsed.getExplainNode()); - } - catch (Exception e2) { - e.addSuppressed(e2); - } - } - Logger logger = log; - if (!plannerContext.getQueryContext().isDebug()) { - logger = log.noStackTrace(); - } - String errorMessage = buildSQLPlanningErrorMessage(cannotPlanException); - logger.warn(e, errorMessage); - throw new UnsupportedSQLQueryException(errorMessage); - } - } - - public PlannerContext getPlannerContext() - { - return plannerContext; - } - - @Override - public void close() - { - planner.close(); + throw new ValidationException(StringUtils.format("Cannot handle [%s].", root.getKind())); } /** - * While the actual query might not have changed, if the druid planner is re-used, we still have the need to reset the - * {@link #planner} since we do not re-use artifacts or keep track of state between - * {@link #validate}, {@link #prepare}, and {@link #plan} and instead repeat parsing and validation - * for each step. - * - * Currently, that state tracking is done in {@link org.apache.druid.sql.SqlLifecycle}, which will create a new - * planner for each of the corresponding steps so this isn't strictly necessary at this time, this method is here as - * much to make this situation explicit and provide context for a future refactor as anything else (and some tests - * do re-use the planner between validate, prepare, and plan, which will run into this issue). + * Validates a SQL query and populates {@link PlannerContext#getResourceActions()}. * - * This could be improved by tying {@link org.apache.druid.sql.SqlLifecycle} and {@link DruidPlanner} states more - * closely with the state of {@link #planner}, instead of repeating parsing and validation between each of these - * steps. + * @return set of {@link Resource} corresponding to any Druid datasources + * or views which are taking part in the query. */ - private void resetPlanner() + public void validate() throws SqlParseException, ValidationException { - planner.close(); - planner.reset(); - } - - /** - * Construct a {@link PlannerResult} for a {@link RelNode} that is directly translatable to a native Druid query. - */ - private PlannerResult planWithDruidConvention( - final RelRoot root, - @Nullable final SqlExplain explain, - @Nullable final SqlInsert insertOrReplace - ) throws ValidationException, RelConversionException - { - final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(root); - final QueryMaker queryMaker = buildQueryMaker(root, insertOrReplace); - plannerContext.setQueryMaker(queryMaker); - - RelNode parameterized = rewriteRelDynamicParameters(possiblyLimitedRoot.rel); - final DruidRel druidRel = (DruidRel) planner.transform( - Rules.DRUID_CONVENTION_RULES, - planner.getEmptyTraitSet() - .replace(DruidConvention.instance()) - .plus(root.collation), - parameterized - ); - - if (explain != null) { - return planExplanation(druidRel, explain, true); - } else { - final Supplier> resultsSupplier = () -> { - // sanity check - final Set readResourceActions = - plannerContext.getResourceActions() - .stream() - .filter(action -> action.getAction() == Action.READ) - .collect(Collectors.toSet()); - - Preconditions.checkState( - readResourceActions.isEmpty() == druidRel.getDataSourceNames().isEmpty() - // The resources found in the plannerContext can be less than the datasources in - // the query plan, because the query planner can eliminate empty tables by replacing - // them with InlineDataSource of empty rows. - || readResourceActions.size() >= druidRel.getDataSourceNames().size(), - "Authorization sanity check failed" - ); + Preconditions.checkState(state == State.START); + SqlNode root = planner.parse(plannerContext.getSql()); + handler = createHandler(root); + handler.analyze(); - return druidRel.runQuery(); - }; + final Set resourceActions = handler.resourceActions(); - return new PlannerResult(resultsSupplier, queryMaker.getResultType()); - } + plannerContext.setResourceActions(resourceActions); + state = State.VALIDATED; } /** - * Construct a {@link PlannerResult} for a fall-back 'bindable' rel, for things that are not directly translatable - * to native Druid queries such as system tables and just a general purpose (but definitely not optimized) fall-back. - * - * See {@link #planWithDruidConvention} which will handle things which are directly translatable - * to native Druid queries. + * Return the resource actions corresponding to the datasources and views which + * an authenticated request must be authorized for to process the + * query. The actions will be {@code null} if the + * planner has not yet advanced to the validation step. This may occur if + * validation fails and the caller ({@code SqlLifecycle}) accesses the resource + * actions as part of clean-up. */ - private PlannerResult planWithBindableConvention( - final RelRoot root, - @Nullable final SqlExplain explain - ) throws RelConversionException + public Set resourceActions(boolean includeContext) { - BindableRel bindableRel = (BindableRel) planner.transform( - Rules.BINDABLE_CONVENTION_RULES, - planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(root.collation), - root.rel - ); - - if (!root.isRefTrivial()) { - // Add a projection on top to accommodate root.fields. - final List projects = new ArrayList<>(); - final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder(); - for (int field : Pair.left(root.fields)) { - projects.add(rexBuilder.makeInputRef(bindableRel, field)); - } - bindableRel = new Bindables.BindableProject( - bindableRel.getCluster(), - bindableRel.getTraitSet(), - bindableRel, - projects, - root.validatedRowType - ); - } - - if (explain != null) { - return planExplanation(bindableRel, explain, false); + Set actions; + if (includeContext) { + actions = new HashSet<>(handler.resourceActions()); + plannerContext.getQueryContext().getUserParams().keySet().forEach(contextParam -> actions.add( + new ResourceAction(new Resource(contextParam, ResourceType.QUERY_CONTEXT), Action.WRITE) + )); } else { - final BindableRel theRel = bindableRel; - final DataContext dataContext = plannerContext.createDataContext( - (JavaTypeFactory) planner.getTypeFactory(), - plannerContext.getParameters() - ); - final Supplier> resultsSupplier = () -> { - final Enumerable enumerable = theRel.bind(dataContext); - final Enumerator enumerator = enumerable.enumerator(); - return Sequences.withBaggage(new BaseSequence<>( - new BaseSequence.IteratorMaker>() - { - @Override - public EnumeratorIterator make() - { - return new EnumeratorIterator<>(new Iterator() - { - @Override - public boolean hasNext() - { - return enumerator.moveNext(); - } - - @Override - public Object[] next() - { - return (Object[]) enumerator.current(); - } - }); - } - - @Override - public void cleanup(EnumeratorIterator iterFromMake) - { - - } - } - ), enumerator::close); - }; - return new PlannerResult(resultsSupplier, root.validatedRowType); + actions = handler.resourceActions(); } + return actions; } /** - * Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode} + * Prepare a SQL query for execution to support prepared statements via JDBC. + * The statement must have already been validated. */ - private PlannerResult planExplanation( - final RelNode rel, - final SqlExplain explain, - final boolean isDruidConventionExplanation - ) + public PrepareResult prepare() throws ValidationException { - String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel()); - String resourcesString; - try { - if (isDruidConventionExplanation && rel instanceof DruidRel) { - // Show the native queries instead of Calcite's explain if the legacy flag is turned off - if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) { - DruidRel druidRel = (DruidRel) rel; - try { - explanation = explainSqlPlanAsNativeQueries(druidRel); - } - catch (Exception ex) { - log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan"); - } - } - } - final Set resources = - plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet()); - resourcesString = plannerContext.getJsonMapper().writeValueAsString(resources); - } - catch (JsonProcessingException jpe) { - // this should never happen, we create the Resources here, not a user - log.error(jpe, "Encountered exception while serializing Resources for explain output"); - resourcesString = null; - } - final Supplier> resultsSupplier = Suppliers.ofInstance( - Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString}))); - return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory())); + Preconditions.checkState(state == State.VALIDATED); + state = State.PREPARED; + return handler.prepare(); } /** - * This method doesn't utilize the Calcite's internal {@link RelOptUtil#dumpPlan} since that tends to be verbose - * and not indicative of the native Druid Queries which will get executed - * This method assumes that the Planner has converted the RelNodes to DruidRels, and thereby we can implictly cast it + * Authorizes the statement. Done within the planner to enforce the authorization + * step within the planner's state machine. * - * @param rel Instance of the root {@link DruidRel} which is formed by running the planner transformations on it - * @return A string representing an array of native queries that correspond to the given SQL query, in JSON format - * @throws JsonProcessingException - */ - private String explainSqlPlanAsNativeQueries(DruidRel rel) throws JsonProcessingException - { - ObjectMapper jsonMapper = plannerContext.getJsonMapper(); - List druidQueryList; - druidQueryList = flattenOutermostRel(rel) - .stream() - .map(druidRel -> druidRel.toDruidQuery(false)) - .collect(Collectors.toList()); - - - // Putting the queries as object node in an ArrayNode, since directly returning a list causes issues when - // serializing the "queryType". Another method would be to create a POJO containing query and signature, and then - // serializing it using normal list method. - ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode(); - - for (DruidQuery druidQuery : druidQueryList) { - Query nativeQuery = druidQuery.getQuery(); - ObjectNode objectNode = jsonMapper.createObjectNode(); - objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class)); - objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class)); - nativeQueriesArrayNode.add(objectNode); - } - - return jsonMapper.writeValueAsString(nativeQueriesArrayNode); - } - - /** - * Given a {@link DruidRel}, this method recursively flattens the Rels if they are of the type {@link DruidUnionRel} - * It is implicitly assumed that the {@link DruidUnionRel} can never be the child of a non {@link DruidUnionRel} - * node - * For eg, a DruidRel structure of kind: - * DruidUnionRel - * DruidUnionRel - * DruidRel (A) - * DruidRel (B) - * DruidRel(C) - * will return [DruidRel(A), DruidRel(B), DruidRel(C)] - * - * @param outermostDruidRel The outermost rel which is to be flattened - * @return a list of DruidRel's which donot have a DruidUnionRel nested in between them - */ - private List> flattenOutermostRel(DruidRel outermostDruidRel) - { - List> druidRels = new ArrayList<>(); - flattenOutermostRel(outermostDruidRel, druidRels); - return druidRels; - } - - /** - * Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if - * they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the - * nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()} - * - * @param druidRel The current relNode - * @param flattendListAccumulator Accumulator list which needs to be appended by this method - */ - private void flattenOutermostRel(DruidRel druidRel, List> flattendListAccumulator) - { - if (druidRel instanceof DruidUnionRel) { - DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel; - druidUnionRel.getInputs().forEach(innerRelNode -> { - DruidRel innerDruidRelNode = (DruidRel) innerRelNode; // This type conversion should always be possible - flattenOutermostRel(innerDruidRelNode, flattendListAccumulator); - }); - } else { - flattendListAccumulator.add(druidRel); - } - } - - /** - * This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel - * is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in - * {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}. - * - * The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by - * the web console, allowing it to apply a limit to queries without rewriting the original SQL. - * - * @param root root node - * @return root node wrapped with a limiting logical sort if a limit is specified in the query context. - */ - @Nullable - private RelRoot possiblyWrapRootWithOuterLimitFromContext(RelRoot root) - { - Object outerLimitObj = plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT); - Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true); - if (outerLimit == null) { - return root; - } - - final LogicalSort newRootRel; - - if (root.rel instanceof Sort) { - Sort sort = (Sort) root.rel; - - final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort); - final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit)); - - if (newOffsetLimit.equals(originalOffsetLimit)) { - // nothing to do, don't bother to make a new sort - return root; - } - - newRootRel = LogicalSort.create( - sort.getInput(), - sort.collation, - newOffsetLimit.getOffsetAsRexNode(rexBuilder), - newOffsetLimit.getLimitAsRexNode(rexBuilder) - ); - } else { - newRootRel = LogicalSort.create( - root.rel, - root.collation, - null, - new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder) - ); - } - - return new RelRoot(newRootRel, root.validatedRowType, root.kind, root.fields, root.collation); - } - - /** - * Constructs an SQL validator, just like papa {@link #planner} uses. + * @param authorizer a function from resource actions to a {@link Access} result. + * @return the return value from the authorizer */ - private SqlValidator getValidator() + public Access authorize(Function, Access> authorizer, boolean authorizeContextParams) { - // this is sort of lame, planner won't cough up its validator, which is nice and seeded after validating a query, - // but it is private and has no accessors, so make another one so we can get the parameter types... but i suppose - // beats creating our own Prepare and Planner implementations - Preconditions.checkNotNull(planner.getTypeFactory()); - - final CalciteConnectionConfig connectionConfig; - - if (frameworkConfig.getContext() != null) { - connectionConfig = frameworkConfig.getContext().unwrap(CalciteConnectionConfig.class); - } else { - Properties properties = new Properties(); - properties.setProperty( - CalciteConnectionProperty.CASE_SENSITIVE.camelName(), - String.valueOf(PlannerFactory.PARSER_CONFIG.caseSensitive()) - ); - connectionConfig = new CalciteConnectionConfigImpl(properties); - } + Preconditions.checkState(state == State.VALIDATED); + Access access = authorizer.apply(resourceActions(authorizeContextParams)); + plannerContext.setAuthorizationResult(access); - Prepare.CatalogReader catalogReader = new CalciteCatalogReader( - CalciteSchema.from(frameworkConfig.getDefaultSchema().getParentSchema()), - CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null), - planner.getTypeFactory(), - connectionConfig - ); - - return SqlValidatorUtil.newValidator( - frameworkConfig.getOperatorTable(), - catalogReader, - planner.getTypeFactory(), - DruidConformance.instance() - ); + // Authorization is done as a flag, not a state, alas. + // Views do prepare without authorize, Avatica does authorize, then prepare, + // so the only constraint is that authorize be done after validation, before plan. + authorized = true; + return access; } /** - * Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any - * {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral} - * replacement + * Plan an SQL query for execution, returning a {@link PlannerResult} which + * can be used to actually execute the query. */ - private SqlNode rewriteDynamicParameters(SqlNode parsed) + public PlannerResult plan() throws ValidationException { - if (!plannerContext.getParameters().isEmpty()) { - SqlParameterizerShuttle sshuttle = new SqlParameterizerShuttle(plannerContext); - return parsed.accept(sshuttle); - } - return parsed; - } - - /** - * Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle} in the event that - * {@link #rewriteDynamicParameters(SqlNode)} was unable to successfully substitute all parameter values, and will - * cause a failure if any dynamic a parameters are not bound. - */ - private RelNode rewriteRelDynamicParameters(RelNode rootRel) - { - RelParameterizerShuttle parameterizer = new RelParameterizerShuttle(plannerContext); - return rootRel.accept(parameterizer); - } - - private QueryMaker buildQueryMaker( - final RelRoot rootQueryRel, - @Nullable final SqlInsert insertOrReplace - ) throws ValidationException - { - if (insertOrReplace != null) { - final String targetDataSource = validateAndGetDataSourceForIngest(insertOrReplace); - validateColumnsForIngestion(rootQueryRel); - return queryMakerFactory.buildForInsert(targetDataSource, rootQueryRel, plannerContext); - } else { - return queryMakerFactory.buildForSelect(rootQueryRel, plannerContext); - } - } - - private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory) - { - return typeFactory.createStructType( - ImmutableList.of( - Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR), - Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR) - ), - ImmutableList.of("PLAN", "RESOURCES") - ); - } - - /** - * Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support. - * Expects the target datasource to be either an unqualified name, or a name qualified by the default schema. - */ - private String validateAndGetDataSourceForIngest(final SqlInsert insert) throws ValidationException - { - final String operatorName = insert.getOperator().getName(); - if (insert.isUpsert()) { - throw new ValidationException("UPSERT is not supported."); - } - - if (insert.getTargetColumnList() != null) { - throw new ValidationException(operatorName + " with target column list is not supported."); - } - - final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable(); - final String dataSource; - - if (tableIdentifier.names.isEmpty()) { - // I don't think this can happen, but include a branch for it just in case. - throw new ValidationException(operatorName + " requires target table."); - } else if (tableIdentifier.names.size() == 1) { - // Unqualified name. - dataSource = Iterables.getOnlyElement(tableIdentifier.names); - } else { - // Qualified name. - final String defaultSchemaName = - Iterables.getOnlyElement(CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null)); - - if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) { - dataSource = tableIdentifier.names.get(1); - } else { - throw new ValidationException( - StringUtils.format("Cannot %s into [%s] because it is not a Druid datasource.", operatorName, tableIdentifier) - ); - } - } - - try { - IdUtils.validateId(operatorName + " dataSource", dataSource); - } - catch (IllegalArgumentException e) { - throw new ValidationException(e.getMessage()); - } - - return dataSource; - } - - private void validateColumnsForIngestion(RelRoot rootQueryRel) throws ValidationException - { - // Check that there are no unnamed columns in the insert. - for (Pair field : rootQueryRel.fields) { - if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) { - throw new ValidationException("Cannot ingest expressions that do not have an alias " - + "or columns with names like EXPR$[digit]." - + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as " - + "\"func(X) as myColumn\""); - } - } - } - - private String buildSQLPlanningErrorMessage(Throwable exception) - { - String errorMessage = plannerContext.getPlanningError(); - if (null == errorMessage && exception instanceof UnsupportedSQLQueryException) { - errorMessage = exception.getMessage(); - } - if (null == errorMessage) { - errorMessage = "Please check broker logs for more details"; - } else { - // Re-phrase since planning errors are more like hints - errorMessage = "Possible error: " + errorMessage; - } - // Finally, add the query itself to error message that user will get. - return StringUtils.format("Cannot build plan for query: %s. %s", plannerContext.getSql(), errorMessage); + Preconditions.checkState(state == State.VALIDATED || state == State.PREPARED); + Preconditions.checkState(authorized); + state = State.PLANNED; + return handler.plan(); } - private static class EnumeratorIterator implements Iterator + public PlannerContext getPlannerContext() { - private final Iterator it; - - EnumeratorIterator(Iterator it) - { - this.it = it; - } - - @Override - public boolean hasNext() - { - return it.hasNext(); - } - - @Override - public T next() - { - return it.next(); - } + return plannerContext; } - private static class ParsedNodes + @Override + public void close() { - @Nullable - private final SqlExplain explain; - - @Nullable - private final SqlInsert insertOrReplace; - - private final SqlNode query; - - @Nullable - private final Granularity ingestionGranularity; - - @Nullable - private final List replaceIntervals; - - private ParsedNodes( - @Nullable SqlExplain explain, - @Nullable SqlInsert insertOrReplace, - SqlNode query, - @Nullable Granularity ingestionGranularity, - @Nullable List replaceIntervals - ) - { - this.explain = explain; - this.insertOrReplace = insertOrReplace; - this.query = query; - this.ingestionGranularity = ingestionGranularity; - this.replaceIntervals = replaceIntervals; - } - - static ParsedNodes create(final SqlNode node, DateTimeZone dateTimeZone) throws ValidationException - { - SqlNode query = node; - SqlExplain explain = null; - if (query.getKind() == SqlKind.EXPLAIN) { - explain = (SqlExplain) query; - query = explain.getExplicandum(); - } - - if (query.getKind() == SqlKind.INSERT) { - if (query instanceof DruidSqlInsert) { - return handleInsert(explain, (DruidSqlInsert) query); - } else if (query instanceof DruidSqlReplace) { - return handleReplace(explain, (DruidSqlReplace) query, dateTimeZone); - } - } - - if (!query.isA(SqlKind.QUERY)) { - throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind())); - } - - return new ParsedNodes(explain, null, query, null, null); - } - - static ParsedNodes handleInsert(SqlExplain explain, DruidSqlInsert druidSqlInsert) throws ValidationException - { - SqlNode query = druidSqlInsert.getSource(); - - // Check if ORDER BY clause is not provided to the underlying query - if (query instanceof SqlOrderBy) { - SqlOrderBy sqlOrderBy = (SqlOrderBy) query; - SqlNodeList orderByList = sqlOrderBy.orderList; - if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { - throw new ValidationException("Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead."); - } - } - - Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy(); - - if (druidSqlInsert.getClusteredBy() != null) { - query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlInsert.getClusteredBy()); - } - - if (!query.isA(SqlKind.QUERY)) { - throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind())); - } - - return new ParsedNodes(explain, druidSqlInsert, query, ingestionGranularity, null); - } - - static ParsedNodes handleReplace(SqlExplain explain, DruidSqlReplace druidSqlReplace, DateTimeZone dateTimeZone) - throws ValidationException - { - SqlNode query = druidSqlReplace.getSource(); - - // Check if ORDER BY clause is not provided to the underlying query - if (query instanceof SqlOrderBy) { - SqlOrderBy sqlOrderBy = (SqlOrderBy) query; - SqlNodeList orderByList = sqlOrderBy.orderList; - if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { - throw new ValidationException("Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead."); - } - } - - SqlNode replaceTimeQuery = druidSqlReplace.getReplaceTimeQuery(); - if (replaceTimeQuery == null) { - throw new ValidationException("Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL."); - } - - Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy(); - List replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery, ingestionGranularity, dateTimeZone); - - if (druidSqlReplace.getClusteredBy() != null) { - query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlReplace.getClusteredBy()); - } - - if (!query.isA(SqlKind.QUERY)) { - throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind())); - } - - return new ParsedNodes(explain, druidSqlReplace, query, ingestionGranularity, replaceIntervals); - } - - @Nullable - public SqlExplain getExplainNode() - { - return explain; - } - - @Nullable - public SqlInsert getInsertOrReplace() - { - return insertOrReplace; - } - - @Nullable - public List getReplaceIntervals() - { - return replaceIntervals; - } - - public SqlNode getQueryNode() - { - return query; - } - - @Nullable - public Granularity getIngestionGranularity() - { - return ingestionGranularity; - } + planner.close(); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java new file mode 100644 index 000000000000..963e27e56e89 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.prepare.Prepare.CatalogReader; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlValidatorImpl; + +/** + * Clone of Calcite's {@code CalciteSqlValidator} which is not + * visible to Druid. + */ +class DruidSqlValidator extends SqlValidatorImpl +{ + protected DruidSqlValidator( + SqlOperatorTable opTab, + CatalogReader catalogReader, + RelDataTypeFactory typeFactory, + SqlConformance conformance) + { + super(opTab, catalogReader, typeFactory, conformance); + } + + @Override protected RelDataType getLogicalSourceRowType( + RelDataType sourceRowType, + SqlInsert insert + ) + { + final RelDataType superType = + super.getLogicalSourceRowType(sourceRowType, insert); + return ((JavaTypeFactory) typeFactory).toSql(superType); + } + + @Override protected RelDataType getLogicalTargetRowType( + RelDataType targetRowType, + SqlInsert insert + ) + { + final RelDataType superType = + super.getLogicalTargetRowType(targetRowType, insert); + return ((JavaTypeFactory) typeFactory).toSql(superType); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/HandlerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/HandlerContext.java new file mode 100644 index 000000000000..ff0b7022f105 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/HandlerContext.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.calcite.avatica.remote.TypedValue; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.druid.query.QueryContext; +import org.apache.druid.sql.calcite.run.QueryMakerFactory; + +import java.util.List; + +/** + * Resources required by statement handlers. + */ +class HandlerContext +{ + private final PlannerContext plannerContext; + private final CalcitePlanner planner; + private final QueryMakerFactory queryMakerFactory; + + public HandlerContext( + final PlannerContext plannerContext, + final CalcitePlanner planner, + final QueryMakerFactory queryMakerFactory) + { + this.plannerContext = plannerContext; + this.planner = planner; + this.queryMakerFactory = queryMakerFactory; + } + + protected PlannerContext plannerContext() + { + return plannerContext; + } + + protected FrameworkConfig frameworkConfig() + { + return planner.frameworkConfig(); + } + + protected CalcitePlanner planner() + { + return planner; + } + + public List parameters() + { + return plannerContext.getParameters(); + } + + public QueryContext queryContext() + { + return plannerContext.getQueryContext(); + } + + public ObjectMapper jsonMapper() + { + return plannerContext.getJsonMapper(); + } + + public QueryMakerFactory queryMakerFactory() + { + return queryMakerFactory; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java new file mode 100644 index 000000000000..1a1e44f451c2 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.Iterables; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.tools.ValidationException; +import org.apache.calcite.util.Pair; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; +import org.apache.druid.sql.calcite.parser.DruidSqlReplace; +import org.apache.druid.sql.calcite.run.QueryMaker; +import org.joda.time.DateTimeZone; + +import java.util.List; + +/** + * Handler for ingestion: base class for INSERT and REPLACE. + */ +abstract class IngestHandler extends QueryHandler +{ + protected Granularity ingestionGranularity; + protected String targetDataSource; + + IngestHandler(HandlerContext handlerContext, SqlInsert insertNode, SqlExplain explain) + { + super(handlerContext, insertNode.getSource(), explain); + } + + @Override + protected boolean allowsBindableExec() + { + return false; + } + + protected abstract SqlInsert ingestNode(); + + protected void analyzeIngest() throws ValidationException + { + try { + if (ingestionGranularity != null) { + handlerContext.queryContext().addSystemParam( + DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, + handlerContext.jsonMapper().writeValueAsString(ingestionGranularity) + ); + } + } + catch (JsonProcessingException e) { + throw new ValidationException("Unable to serialize partition granularity."); + } + + // Check if ORDER BY clause is not provided to the underlying query + if (queryNode instanceof SqlOrderBy) { + SqlOrderBy sqlOrderBy = (SqlOrderBy) queryNode; + SqlNodeList orderByList = sqlOrderBy.orderList; + if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { + String queryType = ingestNode() instanceof DruidSqlReplace ? "a REPLACE" : "an INSERT"; + throw new ValidationException( + "Cannot have ORDER BY on " + queryType + " query, use CLUSTERED BY instead."); + } + } + targetDataSource = validateAndGetDataSourceForIngest(ingestNode()); + resourceActions.add(new ResourceAction(new Resource(targetDataSource, ResourceType.DATASOURCE), Action.WRITE)); + } + + /** + * Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support. + * Expects the target datasource to be either an unqualified name, or a name qualified by the default schema. + */ + private String validateAndGetDataSourceForIngest(final SqlInsert insert) throws ValidationException + { + final String operatorName = insert.getOperator().getName(); + if (insert.isUpsert()) { + throw new ValidationException("UPSERT is not supported."); + } + + if (insert.getTargetColumnList() != null) { + throw new ValidationException(operatorName + " with target column list is not supported."); + } + + final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable(); + final String dataSource; + + if (tableIdentifier.names.isEmpty()) { + // I don't think this can happen, but include a branch for it just in case. + throw new ValidationException(operatorName + " requires target table."); + } else if (tableIdentifier.names.size() == 1) { + // Unqualified name. + dataSource = Iterables.getOnlyElement(tableIdentifier.names); + } else { + // Qualified name. + final String defaultSchemaName = + Iterables.getOnlyElement(CalciteSchema.from(handlerContext.frameworkConfig().getDefaultSchema()).path(null)); + + if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) { + dataSource = tableIdentifier.names.get(1); + } else { + throw new ValidationException( + StringUtils.format("Cannot %s into [%s] because it is not a Druid datasource.", operatorName, tableIdentifier) + ); + } + } + + try { + IdUtils.validateId(operatorName + " dataSource", dataSource); + } + catch (IllegalArgumentException e) { + throw new ValidationException(e.getMessage()); + } + + return dataSource; + } + + protected void verifyQuery(SqlNodeList clusteredBy) throws ValidationException + { + if (clusteredBy != null) { + queryNode = DruidSqlParserUtils.convertClusterByToOrderBy(queryNode, clusteredBy); + } + + if (!queryNode.isA(SqlKind.QUERY)) { + throw new ValidationException(StringUtils.format("Cannot execute [%s].", queryNode.getKind())); + } + } + + @Override + protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException + { + validateColumnsForIngestion(rootQueryRel); + return handlerContext.queryMakerFactory().buildForInsert( + targetDataSource, + rootQueryRel, + handlerContext.plannerContext()); + } + + private void validateColumnsForIngestion(RelRoot rootQueryRel) throws ValidationException + { + // Check that there are no unnamed columns in the insert. + for (Pair field : rootQueryRel.fields) { + if (DruidPlanner.UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) { + throw new ValidationException(DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR); + } + } + } + + /** + * Handler for the INSERT statement. + */ + protected static class InsertHandler extends IngestHandler + { + private final DruidSqlInsert sqlNode; + + public InsertHandler(HandlerContext handlerContext, DruidSqlInsert sqlNode, SqlExplain explain) + { + super(handlerContext, sqlNode, explain); + this.sqlNode = sqlNode; + } + + @Override + protected SqlNode sqlNode() + { + return sqlNode; + } + + @Override + protected SqlInsert ingestNode() + { + return sqlNode; + } + + @Override + public void analyze() throws ValidationException + { + ingestionGranularity = sqlNode.getPartitionedBy(); + analyzeIngest(); + verifyQuery(sqlNode.getClusteredBy()); + validateQuery(); + } + } + + /** + * Handler for the REPLACE statement. + */ + protected static class ReplaceHandler extends IngestHandler + { + private final DruidSqlReplace sqlNode; + private final DateTimeZone timeZone; + private List replaceIntervals; + + public ReplaceHandler( + HandlerContext handlerContext, + DruidSqlReplace sqlNode, + DateTimeZone timeZone, + SqlExplain explain + ) + { + super(handlerContext, sqlNode, explain); + this.sqlNode = sqlNode; + this.timeZone = timeZone; + } + + @Override + protected SqlNode sqlNode() + { + return sqlNode; + } + + @Override + protected SqlInsert ingestNode() + { + return sqlNode; + } + + @Override + public void analyze() throws ValidationException + { + ingestionGranularity = sqlNode.getPartitionedBy(); + analyzeIngest(); + + SqlNode replaceTimeQuery = sqlNode.getReplaceTimeQuery(); + if (replaceTimeQuery == null) { + throw new ValidationException( + "Missing time chunk information in OVERWRITE clause for REPLACE, " + + "set it to OVERWRITE WHERE <__time based condition> or set it to " + + "overwrite the entire table with OVERWRITE ALL."); + } + + replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals( + replaceTimeQuery, + ingestionGranularity, + timeZone); + if (replaceIntervals != null) { + handlerContext.queryContext().addSystemParam( + DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS, + String.join(",", replaceIntervals) + ); + } + + verifyQuery(sqlNode.getClusteredBy()); + validateQuery(); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 41b5a6340c53..1aae09c4ae19 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -88,7 +88,7 @@ public class PlannerContext // result of authentication, providing identity to authorize set of resources produced by validation private AuthenticationResult authenticationResult; // set of datasources and views which must be authorized, initialized to null so we can detect if it has been set. - private Set resourceActions = null; + private Set resourceActions; // result of authorizing set of resources against authentication identity private Access authorizationResult; // error messages encountered while planning the query diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java index bf2b96fa3480..0380deff624d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java @@ -109,15 +109,7 @@ public DruidPlanner createPlanner(final String sql, final QueryContext queryCont queryContext ); - return createPlannerWithContext(context); - } - - /** - * Create a new Druid query planner, re-using a previous {@link PlannerContext} - */ - public DruidPlanner createPlannerWithContext(final PlannerContext plannerContext) - { - return new DruidPlanner(buildFrameworkConfig(plannerContext), plannerContext, queryMakerFactory); + return new DruidPlanner(buildFrameworkConfig(context), context, queryMakerFactory); } /** @@ -131,12 +123,12 @@ public DruidPlanner createPlannerForTesting(final Map queryConte thePlanner.getPlannerContext() .setAuthenticationResult(NoopEscalator.getInstance().createEscalatedAuthenticationResult()); try { - thePlanner.validate(false); + thePlanner.validate(); } catch (SqlParseException | ValidationException e) { throw new RuntimeException(e); } - thePlanner.getPlannerContext().setAuthorizationResult(Access.OK); + thePlanner.authorize(ra -> Access.OK, false); return thePlanner; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java new file mode 100644 index 000000000000..520186717cf1 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.interpreter.BindableConvention; +import org.apache.calcite.interpreter.BindableRel; +import org.apache.calcite.interpreter.Bindables; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.tools.ValidationException; +import org.apache.calcite.util.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.BaseSequence; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.Query; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.rel.DruidConvention; +import org.apache.druid.sql.calcite.rel.DruidQuery; +import org.apache.druid.sql.calcite.rel.DruidRel; +import org.apache.druid.sql.calcite.rel.DruidUnionRel; +import org.apache.druid.sql.calcite.run.QueryMaker; +import org.apache.druid.utils.Throwables; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Abstract base class for handlers that revolve around queries: SELECT, + * INSERT and REPLACE. This class handles the common SELECT portion of the statement. + */ +abstract class QueryHandler extends BaseStatementHandler +{ + protected SqlNode queryNode; + protected SqlExplain explain; + protected SqlNode validatedQueryNode; + private RelRoot rootQueryRel; + + public QueryHandler(HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain) + { + super(handlerContext); + this.queryNode = sqlNode; + this.explain = explain; + } + + protected abstract boolean allowsBindableExec(); + + protected SqlNode validateQuery() throws ValidationException + { + validatedQueryNode = validateNode(rewriteDynamicParameters(queryNode)); + SqlResourceCollectorShuttle resourceCollectorShuttle = + new SqlResourceCollectorShuttle( + handlerContext.planner().getValidator(), + handlerContext.plannerContext()); + validatedQueryNode.accept(resourceCollectorShuttle); + resourceActions.addAll(resourceCollectorShuttle.getResourceActions()); + return validatedQueryNode; + } + + @Override + public PrepareResult prepare() throws ValidationException + { + rootQueryRel = handlerContext.planner().rel(validatedQueryNode); + + final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory(); + final SqlValidator validator = handlerContext.planner().getValidator(); + final RelDataType parameterTypes = validator.getParameterRowType(validatedQueryNode); + final RelDataType returnedRowType; + + if (explain != null) { + returnedRowType = getExplainStructType(typeFactory); + } else { + returnedRowType = buildQueryMaker(rootQueryRel).getResultType(); + } + return new PrepareResult(returnedRowType, parameterTypes); + } + + private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory) + { + return typeFactory.createStructType( + ImmutableList.of( + Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR), + Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR) + ), + ImmutableList.of("PLAN", "RESOURCES") + ); + } + + protected abstract QueryMaker buildQueryMaker(RelRoot rootQueryRel) throws ValidationException; + + /** + * Plan an SQL query for execution, returning a {@link PlannerResult} which + * can be used to actually execute the query. + * + * Ideally, the query can be planned into a native Druid query, using + * {@link #planWithDruidConvention}, but will fall-back to + * {@link #planWithBindableConvention} if this is not possible. + */ + @Override + public PlannerResult plan() throws ValidationException + { + CalcitePlanner planner = handlerContext.planner(); + if (rootQueryRel == null) { + // Set if the prepare step was done, null if jump straight to plan. + rootQueryRel = planner.rel(validatedQueryNode); + } + + // The planner's type factory is not available until after parsing. + RexBuilder rexBuilder = new RexBuilder(planner.getTypeFactory()); + + try { + return planWithDruidConvention(rootQueryRel, rexBuilder); + } + catch (Exception e) { + Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class); + if (null == cannotPlanException) { + // Not a CannotPlanException, rethrow without trying with bindable + throw e; + } + + // If there isn't any ingestion clause, then we should try again with BINDABLE convention. + // And return without any error, if it is plannable by the bindable convention. + if (allowsBindableExec()) { + // Try again with BINDABLE convention. Used for querying Values and metadata tables. + try { + return planWithBindableConvention(rootQueryRel); + } + catch (Exception e2) { + e.addSuppressed(e2); + } + } + Logger logger = DruidPlanner.log; + if (!handlerContext.queryContext().isDebug()) { + logger = DruidPlanner.log.noStackTrace(); + } + String errorMessage = buildSQLPlanningErrorMessage(cannotPlanException); + logger.warn(e, errorMessage); + throw new UnsupportedSQLQueryException(errorMessage); + } + } + + /** + * Construct a {@link PlannerResult} for a {@link RelNode} that is directly translatable to a native Druid query. + */ + private PlannerResult planWithDruidConvention( + final RelRoot root, + final RexBuilder rexBuilder + ) throws ValidationException + { + CalcitePlanner planner = handlerContext.planner(); + final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(root, rexBuilder); + + // Create query maker before applying the Druid rules. The rules refer + // to the query maker via the planner context. + final QueryMaker queryMaker = buildQueryMaker(possiblyLimitedRoot); + handlerContext.plannerContext().setQueryMaker(queryMaker); + + // Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle} + // in the event that {@link #rewriteDynamicParameters(SqlNode)} was unable to + // successfully substitute all parameter values, and will cause a failure if any + // dynamic a parameters are not bound. This occurs at least for DATE parameters + // with integer values. + // + // This check also catches the case where we did not do a parameter check earlier + // because no values were provided. (Values are not required in the PREPARE case + // but now that we're planning, we require them.) + RelNode parameterized = possiblyLimitedRoot.rel.accept( + new RelParameterizerShuttle(handlerContext.plannerContext())); + final DruidRel druidRel = (DruidRel) planner.transform( + Rules.DRUID_CONVENTION_RULES, + planner.getEmptyTraitSet() + .replace(DruidConvention.instance()) + .plus(root.collation), + parameterized + ); + + if (explain != null) { + return planExplanation(druidRel, true); + } else { + return planDruidExecution(druidRel, possiblyLimitedRoot, queryMaker); + } + } + + /** + * This method wraps the root with a {@link LogicalSort} that applies a limit + * (no ordering change). If the outer rel is already a {@link Sort}, we can + * merge our outerLimit into it, similar to what is going on in + * {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}. + * + * The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this + * wrapping is meant for internal use only by the web console, allowing it + * to apply a limit to queries without rewriting the original SQL. + * + * @param root root node + * @return root node wrapped with a limiting logical sort if a limit is + * specified in the query context. + */ + @Nullable + private RelRoot possiblyWrapRootWithOuterLimitFromContext(RelRoot root, RexBuilder rexBuilder) + { + Object outerLimitObj = handlerContext.queryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT); + Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true); + if (outerLimit == null) { + return root; + } + + final LogicalSort newRootRel; + + if (root.rel instanceof Sort) { + Sort sort = (Sort) root.rel; + + final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort); + final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit)); + + if (newOffsetLimit.equals(originalOffsetLimit)) { + // nothing to do, don't bother to make a new sort + return root; + } + + newRootRel = LogicalSort.create( + sort.getInput(), + sort.collation, + newOffsetLimit.getOffsetAsRexNode(rexBuilder), + newOffsetLimit.getLimitAsRexNode(rexBuilder) + ); + } else { + newRootRel = LogicalSort.create( + root.rel, + root.collation, + null, + new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder) + ); + } + + return new RelRoot(newRootRel, root.validatedRowType, root.kind, root.fields, root.collation); + } + + private PlannerResult planDruidExecution( + final DruidRel druidRel, + RelRoot possiblyLimitedRoot, + QueryMaker queryMaker + ) + { + final Supplier> resultsSupplier = () -> { + // sanity check + final Set readResourceActions = + handlerContext + .plannerContext() + .getResourceActions() + .stream() + .filter(action -> action.getAction() == Action.READ) + .collect(Collectors.toSet()); + + // TODO: This is not really a state check since there is a race condition. + // This can be seen as verifying that a check was done, or as redoing the + // check with the latest info (if the permissions are updated in between.) + Preconditions.checkState( + readResourceActions.isEmpty() == druidRel.getDataSourceNames().isEmpty() + // The resources found in the plannerContext can be less than the datasources in + // the query plan, because the query planner can eliminate empty tables by replacing + // them with InlineDataSource of empty rows. + || readResourceActions.size() >= druidRel.getDataSourceNames().size(), + "Authorization sanity check failed" + ); + + return druidRel.runQuery(); + }; + + return new PlannerResult(resultsSupplier, queryMaker.getResultType()); + } + + /** + * Construct a {@link PlannerResult} for a fall-back 'bindable' rel, for + * things that are not directly translatable to native Druid queries such + * as system tables and just a general purpose (but definitely not optimized) + * fall-back. + * + * See {@link #planWithDruidConvention} which will handle things which are + * directly translatable to native Druid queries. + * + * The bindable path handles parameter substitution of any values not + * bound by the earlier steps. + */ + private PlannerResult planWithBindableConvention( + final RelRoot root + ) + { + CalcitePlanner planner = handlerContext.planner(); + BindableRel bindableRel = (BindableRel) planner.transform( + Rules.BINDABLE_CONVENTION_RULES, + planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(root.collation), + root.rel + ); + + if (!root.isRefTrivial()) { + // Add a projection on top to accommodate root.fields. + final List projects = new ArrayList<>(); + final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder(); + for (int field : Pair.left(root.fields)) { + projects.add(rexBuilder.makeInputRef(bindableRel, field)); + } + bindableRel = new Bindables.BindableProject( + bindableRel.getCluster(), + bindableRel.getTraitSet(), + bindableRel, + projects, + root.validatedRowType + ); + } + + if (explain != null) { + return planExplanation(bindableRel, false); + } else { + return planBindableExecution(root, bindableRel); + } + } + + /** + * Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode} + */ + private PlannerResult planExplanation( + final RelNode rel, + final boolean isDruidConventionExplanation + ) + { + PlannerContext plannerContext = handlerContext.plannerContext(); + String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel()); + String resourcesString; + try { + if (isDruidConventionExplanation && rel instanceof DruidRel) { + // Show the native queries instead of Calcite's explain if the legacy flag is turned off + if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) { + DruidRel druidRel = (DruidRel) rel; + try { + explanation = explainSqlPlanAsNativeQueries(druidRel); + } + catch (Exception ex) { + DruidPlanner.log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan"); + } + } + } + final Set resources = + plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet()); + resourcesString = plannerContext.getJsonMapper().writeValueAsString(resources); + } + catch (JsonProcessingException jpe) { + // this should never happen, we create the Resources here, not a user + DruidPlanner.log.error(jpe, "Encountered exception while serializing Resources for explain output"); + resourcesString = null; + } + final Supplier> resultsSupplier = Suppliers.ofInstance( + Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString}))); + return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory())); + } + + + /** + * This method doesn't utilize the Calcite's internal {@link RelOptUtil#dumpPlan} + * since that tends to be verbose and not indicative of the native Druid Queries + * which will get executed. This method assumes that the Planner has converted + * the RelNodes to DruidRels, and thereby we can implicitly cast it. + * + * @param rel Instance of the root {@link DruidRel} which is formed by running + * the planner transformations on it + * @return A string representing an array of native queries that correspond to the + * given SQL query, in JSON format + * @throws JsonProcessingException + */ + private String explainSqlPlanAsNativeQueries(DruidRel rel) throws JsonProcessingException + { + ObjectMapper jsonMapper = handlerContext.plannerContext().getJsonMapper(); + List druidQueryList; + druidQueryList = flattenOutermostRel(rel) + .stream() + .map(druidRel -> druidRel.toDruidQuery(false)) + .collect(Collectors.toList()); + + + // Putting the queries as object node in an ArrayNode, since directly + // returning a list causes issues when serializing the "queryType". + // Another method would be to create a POJO containing query and signature, + // and then serializing it using normal list method. + ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode(); + + for (DruidQuery druidQuery : druidQueryList) { + Query nativeQuery = druidQuery.getQuery(); + ObjectNode objectNode = jsonMapper.createObjectNode(); + objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class)); + objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class)); + nativeQueriesArrayNode.add(objectNode); + } + + return jsonMapper.writeValueAsString(nativeQueriesArrayNode); + } + + /** + * Given a {@link DruidRel}, this method recursively flattens the Rels if + * they are of the type {@link DruidUnionRel}. It is implicitly assumed + * that the {@link DruidUnionRel} can never be the child of a non + * {@link DruidUnionRel} node. + * + * For eg, a DruidRel structure of kind: + * DruidUnionRel + * DruidUnionRel + * DruidRel (A) + * DruidRel (B) + * DruidRel(C) + * will return [DruidRel(A), DruidRel(B), DruidRel(C)] + * + * @param outermostDruidRel The outermost rel which is to be flattened + * @return a list of DruidRel's which donot have a DruidUnionRel nested in between them + */ + private List> flattenOutermostRel(DruidRel outermostDruidRel) + { + List> druidRels = new ArrayList<>(); + flattenOutermostRel(outermostDruidRel, druidRels); + return druidRels; + } + + /** + * Recursive function (DFS) which traverses the nodes and collects the + * corresponding {@link DruidRel} into a list if they are not of the + * type {@link DruidUnionRel} or else calls the method with the child + * nodes. The DFS order of the nodes are retained, since that is the + * order in which they will actually be called in + * {@link DruidUnionRel#runQuery()}. + * + * @param druidRel The current relNode + * @param flattendListAccumulator Accumulator list which needs to be appended by this method + */ + private void flattenOutermostRel(DruidRel druidRel, List> flattendListAccumulator) + { + if (druidRel instanceof DruidUnionRel) { + DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel; + druidUnionRel.getInputs().forEach(innerRelNode -> { + DruidRel innerDruidRelNode = (DruidRel) innerRelNode; // This type conversion should always be possible + flattenOutermostRel(innerDruidRelNode, flattendListAccumulator); + }); + } else { + flattendListAccumulator.add(druidRel); + } + } + + private PlannerResult planBindableExecution( + final RelRoot root, + final BindableRel bindableRel + ) + { + CalcitePlanner planner = handlerContext.planner(); + final BindableRel theRel = bindableRel; + final DataContext dataContext = handlerContext.plannerContext().createDataContext( + (JavaTypeFactory) planner.getTypeFactory(), + handlerContext.parameters() + ); + final Supplier> resultsSupplier = () -> { + final Enumerable enumerable = theRel.bind(dataContext); + final Enumerator enumerator = enumerable.enumerator(); + return Sequences.withBaggage(new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public EnumeratorIterator make() + { + return new EnumeratorIterator<>(new Iterator() + { + @Override + public boolean hasNext() + { + return enumerator.moveNext(); + } + + @Override + public Object[] next() + { + return (Object[]) enumerator.current(); + } + }); + } + + @Override + public void cleanup(EnumeratorIterator iterFromMake) + { + + } + } + ), enumerator::close); + }; + return new PlannerResult(resultsSupplier, root.validatedRowType); + } + + private String buildSQLPlanningErrorMessage(Throwable exception) + { + PlannerContext plannerContext = handlerContext.plannerContext(); + String errorMessage = plannerContext.getPlanningError(); + if (null == errorMessage && exception instanceof UnsupportedSQLQueryException) { + errorMessage = exception.getMessage(); + } + if (null == errorMessage) { + errorMessage = "Please check broker logs for more details"; + } else { + // Re-phrase since planning errors are more like hints + errorMessage = "Possible error: " + errorMessage; + } + // Finally, add the query itself to error message that user will get. + return StringUtils.format("Cannot build plan for query: %s. %s", plannerContext.getSql(), errorMessage); + } + + private static class EnumeratorIterator implements Iterator + { + private final Iterator it; + + EnumeratorIterator(Iterator it) + { + this.it = it; + } + + @Override + public boolean hasNext() + { + return it.hasNext(); + } + + @Override + public T next() + { + return it.next(); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/RelParameterizerShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/RelParameterizerShuttle.java index 8029a89d97b9..cd9b1c2d2138 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/RelParameterizerShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/RelParameterizerShuttle.java @@ -203,7 +203,7 @@ private RexNode bind(RexNode node, RexBuilder builder, RelDataTypeFactory typeFa if (param == null) { throw new SqlPlanningException( PlanningError.VALIDATION_ERROR, - StringUtils.format("Parameter at position[%s] is not bound", dynamicParam.getIndex()) + StringUtils.format("Parameter at position [%s] is not bound", dynamicParam.getIndex()) ); } if (param.value == null) { @@ -218,7 +218,7 @@ private RexNode bind(RexNode node, RexBuilder builder, RelDataTypeFactory typeFa } else { throw new SqlPlanningException( PlanningError.VALIDATION_ERROR, - StringUtils.format("Parameter at position[%s] is not bound", dynamicParam.getIndex()) + StringUtils.format("Parameter at position [%s] is not bound", dynamicParam.getIndex()) ); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SelectHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SelectHandler.java new file mode 100644 index 000000000000..60f720fd1576 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SelectHandler.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.tools.ValidationException; +import org.apache.druid.sql.calcite.run.QueryMaker; + +/** + * Handler for the SELECT statement. + */ +class SelectHandler extends QueryHandler +{ + private final SqlNode sqlNode; + + public SelectHandler(HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain) + { + super(handlerContext, sqlNode, explain); + this.sqlNode = sqlNode; + } + + @Override + protected boolean allowsBindableExec() + { + return true; + } + + @Override + protected SqlNode sqlNode() + { + return sqlNode; + } + + @Override + public void analyze() throws ValidationException + { + validateQuery(); + } + + @Override + protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException + { + return handlerContext.queryMakerFactory().buildForSelect( + rootQueryRel, + handlerContext.plannerContext()); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java index f73ff1de580d..456415ee67b7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlParameterizerShuttle.java @@ -29,13 +29,26 @@ import org.apache.druid.java.util.common.IAE; /** - * Replaces all {@link SqlDynamicParam} encountered in an {@link SqlNode} tree with a {@link SqlLiteral} if a value - * binding exists for the parameter, if possible. This is used in tandem with {@link RelParameterizerShuttle}. + * Replaces all {@link SqlDynamicParam} encountered in an {@link SqlNode} tree + * with a {@link SqlLiteral} if a value binding exists for the parameter, if + * possible. This is used in tandem with {@link RelParameterizerShuttle}. * - * It is preferable that all parameters are placed here to pick up as many optimizations as possible, but the facilities - * to convert jdbc types to {@link SqlLiteral} are a bit less rich here than exist for converting a - * {@link org.apache.calcite.rex.RexDynamicParam} to {@link org.apache.calcite.rex.RexLiteral}, which is why - * {@link SqlParameterizerShuttle} and {@link RelParameterizerShuttle} both exist. + * It is preferable that all parameters are placed here to pick up as many + * optimizations as possible, but the facilities to convert jdbc types to + * {@link SqlLiteral} are a bit less rich here than exist for converting a + * {@link org.apache.calcite.rex.RexDynamicParam} to + * {@link org.apache.calcite.rex.RexLiteral}, which is why + * {@link SqlParameterizerShuttle} and {@link RelParameterizerShuttle} + * both exist. + * + * As it turns out, most parameters will be replaced in this shuttle. + * The one exception are DATE types expressed as integers. For reasons + * known only to Calcite, the {@code RexBuilder.clean()} method, used by + * {@code RelParameterizerShuttle}, handles integer values for dates, + * but the {@code SqlTypeName.createLiteral()} method used here does + * not. As a result, DATE parameters will be left as parameters to be + * filled in later. Fortunately, this does not affect optimizations as + * there are no rules that optimize based on the value of a DATE. */ public class SqlParameterizerShuttle extends SqlShuttle { @@ -49,35 +62,37 @@ public SqlParameterizerShuttle(PlannerContext plannerContext) @Override public SqlNode visit(SqlDynamicParam param) { - try { - if (plannerContext.getParameters().size() > param.getIndex()) { - TypedValue paramBinding = plannerContext.getParameters().get(param.getIndex()); - if (paramBinding == null) { - throw new IAE("Parameter at position[%s] is not bound", param.getIndex()); - } - if (paramBinding.value == null) { - return SqlLiteral.createNull(param.getParserPosition()); - } - SqlTypeName typeName = SqlTypeName.getNameForJdbcType(paramBinding.type.typeId); - if (SqlTypeName.APPROX_TYPES.contains(typeName)) { - return SqlLiteral.createApproxNumeric(paramBinding.value.toString(), param.getParserPosition()); - } - if (SqlTypeName.TIMESTAMP.equals(typeName) && paramBinding.value instanceof Long) { - return SqlLiteral.createTimestamp( - TimestampString.fromMillisSinceEpoch((Long) paramBinding.value), - 0, - param.getParserPosition() - ); - } + if (plannerContext.getParameters().size() <= param.getIndex()) { + throw new IAE("Parameter at position [%s] is not bound", param.getIndex()); + } + TypedValue paramBinding = plannerContext.getParameters().get(param.getIndex()); + if (paramBinding == null) { + throw new IAE("Parameter at position [%s] is not bound", param.getIndex()); + } + if (paramBinding.value == null) { + return SqlLiteral.createNull(param.getParserPosition()); + } + SqlTypeName typeName = SqlTypeName.getNameForJdbcType(paramBinding.type.typeId); + if (SqlTypeName.APPROX_TYPES.contains(typeName)) { + return SqlLiteral.createApproxNumeric(paramBinding.value.toString(), param.getParserPosition()); + } + if (SqlTypeName.TIMESTAMP.equals(typeName) && paramBinding.value instanceof Long) { + return SqlLiteral.createTimestamp( + TimestampString.fromMillisSinceEpoch((Long) paramBinding.value), + 0, + param.getParserPosition() + ); + } - return typeName.createLiteral(paramBinding.value, param.getParserPosition()); - } else { - throw new IAE("Parameter at position[%s] is not bound", param.getIndex()); - } + try { + // This throws ClassCastException for a DATE parameter given as + // an Integer. The parameter is left in place and is replaced + // properly later by RelParameterizerShuttle. + return typeName.createLiteral(paramBinding.value, param.getParserPosition()); } catch (ClassCastException ignored) { // suppress + return param; } - return param; } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ValidationResult.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ValidationResult.java deleted file mode 100644 index 206bd28436d3..000000000000 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ValidationResult.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.sql.calcite.planner; - -import com.google.common.collect.ImmutableSet; -import org.apache.druid.server.security.Resource; -import org.apache.druid.server.security.ResourceAction; - -import java.util.Set; - -/** - * If an SQL query can be validated by {@link DruidPlanner}, the resulting artifact is the set of {@link Resource} - * corresponding to the datasources and views which an authenticated request must be authorized for to process the - * query. - */ -public class ValidationResult -{ - private final Set resourceActions; - - public ValidationResult( - final Set resourceActions - ) - { - this.resourceActions = ImmutableSet.copyOf(resourceActions); - } - - public Set getResourceActions() - { - return resourceActions; - } -} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/view/DruidViewMacro.java b/sql/src/main/java/org/apache/druid/sql/calcite/view/DruidViewMacro.java index 1d0f70493afd..b4739cb9a4db 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/view/DruidViewMacro.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/DruidViewMacro.java @@ -58,7 +58,8 @@ public TranslatableTable apply(final List arguments) { final RelDataType rowType; try (final DruidPlanner planner = plannerFactory.createPlanner(viewSql, new QueryContext())) { - rowType = planner.plan().rowType(); + planner.validate(); + rowType = planner.prepare().getRowType(); } catch (Exception e) { throw new RuntimeException(e); diff --git a/sql/src/test/java/org/apache/druid/sql/SqlLifecycleTest.java b/sql/src/test/java/org/apache/druid/sql/SqlLifecycleTest.java index 64f2e2dcfe33..d04b1a7da659 100644 --- a/sql/src/test/java/org/apache/druid/sql/SqlLifecycleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/SqlLifecycleTest.java @@ -42,7 +42,6 @@ import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.planner.PlannerResult; import org.apache.druid.sql.calcite.planner.PrepareResult; -import org.apache.druid.sql.calcite.planner.ValidationResult; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.http.SqlParameter; import org.easymock.EasyMock; @@ -51,8 +50,8 @@ import org.junit.Test; import javax.servlet.http.HttpServletRequest; + import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -137,18 +136,15 @@ public void testStateTransition() // test authorization DruidPlanner mockPlanner = EasyMock.createMock(DruidPlanner.class); PlannerContext mockPlannerContext = EasyMock.createMock(PlannerContext.class); - ValidationResult validationResult = new ValidationResult(Collections.emptySet()); EasyMock.expect(plannerFactory.createPlanner(EasyMock.eq(sql), EasyMock.anyObject())).andReturn(mockPlanner).once(); EasyMock.expect(mockPlanner.getPlannerContext()).andReturn(mockPlannerContext).once(); mockPlannerContext.setAuthenticationResult(CalciteTests.REGULAR_USER_AUTH_RESULT); EasyMock.expectLastCall(); mockPlannerContext.setParameters(parameters); EasyMock.expectLastCall(); - EasyMock.expect(plannerFactory.getAuthorizerMapper()).andReturn(CalciteTests.TEST_AUTHORIZER_MAPPER).once(); - mockPlannerContext.setAuthorizationResult(Access.OK); + mockPlanner.validate(); EasyMock.expectLastCall(); - EasyMock.expect(mockPlanner.validate(false)).andReturn(validationResult).once(); - mockPlanner.close(); + EasyMock.expect(mockPlanner.authorize(EasyMock.anyObject(), EasyMock.eq(false))).andReturn(Access.OK).once(); EasyMock.expectLastCall(); EasyMock.replay(plannerFactory, serviceEmitter, requestLogger, mockPlanner, mockPlannerContext); @@ -160,9 +156,7 @@ public void testStateTransition() // test prepare PrepareResult mockPrepareResult = EasyMock.createMock(PrepareResult.class); - EasyMock.expect(plannerFactory.createPlannerWithContext(EasyMock.eq(mockPlannerContext))).andReturn(mockPlanner).once(); EasyMock.expect(mockPlanner.prepare()).andReturn(mockPrepareResult).once(); - mockPlanner.close(); EasyMock.expectLastCall(); EasyMock.replay(plannerFactory, serviceEmitter, requestLogger, mockPlanner, mockPlannerContext, mockPrepareResult); lifecycle.prepare(); @@ -173,7 +167,6 @@ public void testStateTransition() // test plan PlannerResult mockPlanResult = EasyMock.createMock(PlannerResult.class); - EasyMock.expect(plannerFactory.createPlannerWithContext(EasyMock.eq(mockPlannerContext))).andReturn(mockPlanner).once(); EasyMock.expect(mockPlanner.plan()).andReturn(mockPlanResult).once(); mockPlanner.close(); EasyMock.expectLastCall(); @@ -216,8 +209,8 @@ public void testStateTransition() public void testStateTransitionHttpRequest() throws ValidationException, SqlParseException, RelConversionException, IOException { - // this test is a duplicate of testStateTransition except with a slight variation of how validate and authorize - // is run + // this test is a duplicate of testStateTransition except with a slight + // variation of how validate and authorize is run SqlLifecycle lifecycle = sqlLifecycleFactory.factorize(); final String sql = "select 1 + ?"; Assert.assertEquals(SqlLifecycle.State.NEW, lifecycle.getState()); @@ -233,25 +226,21 @@ public void testStateTransitionHttpRequest() // test authorization DruidPlanner mockPlanner = EasyMock.createMock(DruidPlanner.class); PlannerContext mockPlannerContext = EasyMock.createMock(PlannerContext.class); - ValidationResult validationResult = new ValidationResult(Collections.emptySet()); EasyMock.expect(plannerFactory.createPlanner(EasyMock.eq(sql), EasyMock.anyObject())).andReturn(mockPlanner).once(); EasyMock.expect(mockPlanner.getPlannerContext()).andReturn(mockPlannerContext).once(); mockPlannerContext.setAuthenticationResult(CalciteTests.REGULAR_USER_AUTH_RESULT); EasyMock.expectLastCall(); mockPlannerContext.setParameters(parameters); EasyMock.expectLastCall(); - EasyMock.expect(plannerFactory.getAuthorizerMapper()).andReturn(CalciteTests.TEST_AUTHORIZER_MAPPER).once(); - mockPlannerContext.setAuthorizationResult(Access.OK); + mockPlanner.validate(); EasyMock.expectLastCall(); - EasyMock.expect(mockPlanner.validate(false)).andReturn(validationResult).once(); - mockPlanner.close(); + EasyMock.expect(mockPlanner.authorize(EasyMock.anyObject(), EasyMock.eq(false))).andReturn(Access.OK).once(); EasyMock.expectLastCall(); + // Note: can't check the request usage with mocks: the code is run + // in a function which the mock doesn't actually call. HttpServletRequest request = EasyMock.createMock(HttpServletRequest.class); - EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT).times(2); - EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).once(); - EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).once(); - request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT).once(); EasyMock.expectLastCall(); EasyMock.replay(plannerFactory, serviceEmitter, requestLogger, mockPlanner, mockPlannerContext, request); @@ -262,9 +251,7 @@ public void testStateTransitionHttpRequest() // test prepare PrepareResult mockPrepareResult = EasyMock.createMock(PrepareResult.class); - EasyMock.expect(plannerFactory.createPlannerWithContext(EasyMock.eq(mockPlannerContext))).andReturn(mockPlanner).once(); EasyMock.expect(mockPlanner.prepare()).andReturn(mockPrepareResult).once(); - mockPlanner.close(); EasyMock.expectLastCall(); EasyMock.replay(plannerFactory, serviceEmitter, requestLogger, mockPlanner, mockPlannerContext, mockPrepareResult); lifecycle.prepare(); @@ -275,7 +262,6 @@ public void testStateTransitionHttpRequest() // test plan PlannerResult mockPlanResult = EasyMock.createMock(PlannerResult.class); - EasyMock.expect(plannerFactory.createPlannerWithContext(EasyMock.eq(mockPlannerContext))).andReturn(mockPlanner).once(); EasyMock.expect(mockPlanner.plan()).andReturn(mockPlanResult).once(); mockPlanner.close(); EasyMock.expectLastCall(); diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 85c1599dfe53..2f43ec1a3193 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -123,7 +123,7 @@ public abstract class DruidAvaticaHandlerTest extends CalciteTestBase @Override public int getMaxConnections() { - // This must match the number of Connection objects created in setUp() + // This must match the number of Connection objects created in testTooManyStatements() return 4; } @@ -242,8 +242,6 @@ public void configure(Binder binder) clientLosAngeles = DriverManager.getConnection(url, propertiesLosAngeles); } - - @After public void tearDown() throws Exception { @@ -719,7 +717,6 @@ public void testDatabaseMetaDataColumnsWithSuperuser() throws Exception ); } - @Test(timeout = 90_000L) public void testConcurrentQueries() throws Exception { @@ -754,14 +751,14 @@ public void testConcurrentQueries() throws Exception @Test public void testTooManyStatements() throws Exception { - final Statement statement1 = client.createStatement(); - final Statement statement2 = client.createStatement(); - final Statement statement3 = client.createStatement(); - final Statement statement4 = client.createStatement(); + client.createStatement(); + client.createStatement(); + client.createStatement(); + client.createStatement(); expectedException.expect(AvaticaClientRuntimeException.class); - expectedException.expectMessage("Too many open statements, limit is[4]"); - final Statement statement5 = client.createStatement(); + expectedException.expectMessage("Too many open statements, limit is [4]"); + client.createStatement(); } @Test @@ -849,7 +846,7 @@ public void testTooManyConnections() throws Exception expectedException.expect(AvaticaClientRuntimeException.class); expectedException.expectMessage("Too many connections"); - final Connection connection5 = DriverManager.getConnection(url); + DriverManager.getConnection(url); } @Test @@ -864,7 +861,7 @@ public void testNotTooManyConnectionsWhenTheyAreEmpty() throws Exception final Connection connection3 = DriverManager.getConnection(url); connection3.createStatement().close(); - final Connection connection4 = DriverManager.getConnection(url); + DriverManager.getConnection(url); Assert.assertTrue(true); } @@ -1051,7 +1048,6 @@ public Frame fetch( } @Test - @SuppressWarnings("unchecked") public void testSqlRequestLog() throws Exception { // valid sql @@ -1435,6 +1431,7 @@ private static List> getRows(final ResultSet resultSet, fina } } + @SafeVarargs private static Map row(final Pair... entries) { final Map m = new HashMap<>(); diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java index dd643aaf14a6..d0e2894a9558 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java @@ -51,6 +51,7 @@ import org.junit.rules.TemporaryFolder; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -121,15 +122,15 @@ public void testSignature() Assert.assertEquals(Meta.StatementType.SELECT, signature.statementType); Assert.assertEquals(sql, signature.sql); Assert.assertEquals( - Lists.newArrayList( - Lists.newArrayList("__time", "TIMESTAMP", "java.lang.Long"), - Lists.newArrayList("cnt", "BIGINT", "java.lang.Number"), - Lists.newArrayList("dim1", "VARCHAR", "java.lang.String"), - Lists.newArrayList("dim2", "VARCHAR", "java.lang.String"), - Lists.newArrayList("dim3", "VARCHAR", "java.lang.String"), - Lists.newArrayList("m1", "FLOAT", "java.lang.Float"), - Lists.newArrayList("m2", "DOUBLE", "java.lang.Double"), - Lists.newArrayList("unique_dim1", "OTHER", "java.lang.Object") + Arrays.asList( + Arrays.asList("__time", "TIMESTAMP", "java.lang.Long"), + Arrays.asList("cnt", "BIGINT", "java.lang.Number"), + Arrays.asList("dim1", "VARCHAR", "java.lang.String"), + Arrays.asList("dim2", "VARCHAR", "java.lang.String"), + Arrays.asList("dim3", "VARCHAR", "java.lang.String"), + Arrays.asList("m1", "FLOAT", "java.lang.Float"), + Arrays.asList("m2", "DOUBLE", "java.lang.Double"), + Arrays.asList("unique_dim1", "OTHER", "java.lang.Object") ), Lists.transform( signature.columns, @@ -138,7 +139,7 @@ public void testSignature() @Override public List apply(final ColumnMetaData columnMetaData) { - return Lists.newArrayList( + return Arrays.asList( columnMetaData.label, columnMetaData.type.name, columnMetaData.type.rep.clazz.getName() @@ -161,7 +162,7 @@ public void testSubQueryWithOrderBy() Meta.Frame.create( 0, true, - Lists.newArrayList( + Arrays.asList( new Object[]{""}, new Object[]{ "1" @@ -189,7 +190,7 @@ public void testSelectAllInFirstFrame() Meta.Frame.create( 0, true, - Lists.newArrayList( + Arrays.asList( new Object[]{DateTimes.of("2000-01-01").getMillis(), 1L, "", "a", 1.0f}, new Object[]{ DateTimes.of("2000-01-02").getMillis(), @@ -221,7 +222,7 @@ public void testSelectSplitOverTwoFrames() Meta.Frame.create( 0, false, - Lists.newArrayList( + Arrays.asList( new Object[]{DateTimes.of("2000-01-01").getMillis(), 1L, "", "a", 1.0f}, new Object[]{ DateTimes.of("2000-01-02").getMillis(), @@ -242,7 +243,7 @@ public void testSelectSplitOverTwoFrames() Meta.Frame.create( 2, true, - Lists.newArrayList( + Arrays.asList( new Object[]{DateTimes.of("2000-01-03").getMillis(), 1L, "2", "", 3.0f}, new Object[]{DateTimes.of("2001-01-01").getMillis(), 1L, "1", "a", 4.0f}, new Object[]{DateTimes.of("2001-01-02").getMillis(), 1L, "def", "abc", 5.0f}, @@ -255,6 +256,7 @@ public void testSelectSplitOverTwoFrames() } } + @SuppressWarnings("resource") private DruidStatement statement(String sql) { return new DruidStatement( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index e5b190d90095..46d5405a30a2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -987,7 +987,7 @@ public Set analyzeResources( SqlLifecycle lifecycle = lifecycleFactory.factorize(); lifecycle.initialize(sql, new QueryContext(contexts)); - return lifecycle.runAnalyzeResources(authenticationResult).getResourceActions(); + return lifecycle.runAnalyzeResources(authenticationResult); } public SqlLifecycleFactory getSqlLifecycleFactory( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index aa7a8eb8856f..1c41feb8d9cb 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -38,6 +38,7 @@ import org.apache.druid.sql.calcite.external.ExternalOperatorConversion; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.planner.DruidPlanner; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.util.CalciteTests; import org.hamcrest.CoreMatchers; @@ -699,10 +700,7 @@ public void testInsertWithUnnamedColumnInSelectStatement() .sql("INSERT INTO t SELECT dim1, dim2 || '-lol' FROM foo PARTITIONED BY ALL") .expectValidationError( SqlPlanningException.class, - "Cannot ingest expressions that do not have an alias " - + "or columns with names like EXPR$[digit]." - + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as " - + "\"func(X) as myColumn\"" + DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR ) .verify(); } @@ -714,10 +712,7 @@ public void testInsertWithInvalidColumnNameInIngest() .sql("INSERT INTO t SELECT __time, dim1 AS EXPR$0 FROM foo PARTITIONED BY ALL") .expectValidationError( SqlPlanningException.class, - "Cannot ingest expressions that do not have an alias " - + "or columns with names like EXPR$[digit]." - + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as " - + "\"func(X) as myColumn\"" + DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR ) .verify(); } @@ -731,10 +726,7 @@ public void testInsertWithUnnamedColumnInNestedSelectStatement() + "(SELECT __time, LOWER(dim1) FROM foo) PARTITIONED BY ALL TIME") .expectValidationError( SqlPlanningException.class, - "Cannot ingest expressions that do not have an alias " - + "or columns with names like EXPR$[digit]." - + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as " - + "\"func(X) as myColumn\"" + DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR ) .verify(); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteParameterQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteParameterQueryTest.java index 27500323875a..179e690c1b14 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteParameterQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteParameterQueryTest.java @@ -47,10 +47,11 @@ import java.util.List; /** - * This class has copied a subset of the tests in {@link CalciteQueryTest} and replaced various parts of queries with - * dynamic parameters. It is NOT important that this file remains in sync with {@link CalciteQueryTest}, the tests - * were merely chosen to produce a selection of parameter types and positions within query expressions and have been - * renamed to reflect this + * This class has copied a subset of the tests in {@link CalciteQueryTest} and + * replaced various parts of queries with dynamic parameters. It is NOT + * important that this file remains in sync with {@link CalciteQueryTest}, the + * tests were merely chosen to produce a selection of parameter types and + * positions within query expressions and have been renamed to reflect this */ public class CalciteParameterQueryTest extends BaseCalciteQueryTest { @@ -577,7 +578,7 @@ public void testLongs() throws Exception public void testMissingParameter() throws Exception { expectedException.expect(SqlPlanningException.class); - expectedException.expectMessage("Parameter at position[0] is not bound"); + expectedException.expectMessage("Parameter at position [0] is not bound"); testQuery( "SELECT COUNT(*)\n" + "FROM druid.numfoo\n" @@ -592,7 +593,7 @@ public void testMissingParameter() throws Exception public void testPartiallyMissingParameter() throws Exception { expectedException.expect(SqlPlanningException.class); - expectedException.expectMessage("Parameter at position[1] is not bound"); + expectedException.expectMessage("Parameter at position [1] is not bound"); testQuery( "SELECT COUNT(*)\n" + "FROM druid.numfoo\n" @@ -610,7 +611,7 @@ public void testPartiallyMissingParameterInTheMiddle() throws Exception params.add(null); params.add(new SqlParameter(SqlType.INTEGER, 1)); expectedException.expect(SqlPlanningException.class); - expectedException.expectMessage("Parameter at position[0] is not bound"); + expectedException.expectMessage("Parameter at position [0] is not bound"); testQuery( "SELECT 1 + ?, dim1 FROM foo LIMIT ?", ImmutableList.of(), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java index 79ccebf9ab85..e4426909fece 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java @@ -21,8 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -59,6 +57,7 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; + import java.io.IOException; import java.util.List; import java.util.Map; @@ -172,13 +171,13 @@ public SqlVectorizedExpressionSanityTest(String query) } @Test - public void testQuery() throws SqlParseException, RelConversionException, ValidationException + public void testQuery() throws ValidationException { sanityTestVectorizedSqlQueries(PLANNER_FACTORY, query); } public static void sanityTestVectorizedSqlQueries(PlannerFactory plannerFactory, String query) - throws ValidationException, RelConversionException, SqlParseException + throws ValidationException { final Map vector = ImmutableMap.of( QueryContexts.VECTORIZE_KEY, "force", diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 3e89d8122996..2a36d08c47e4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -86,11 +86,11 @@ public class DruidSchemaTest extends DruidSchemaTestCommon { - private SpecificSegmentsQuerySegmentWalker walker = null; + private SpecificSegmentsQuerySegmentWalker walker; private TestServerInventoryView serverView; private List druidServers; - private DruidSchema schema = null; - private DruidSchema schema2 = null; + private DruidSchema schema; + private DruidSchema schema2; private CountDownLatch buildTableLatch = new CountDownLatch(1); private CountDownLatch markDataSourceLatch = new CountDownLatch(1); private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();