diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java new file mode 100644 index 000000000000..e4020cc86d0c --- /dev/null +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.sql; + +import org.apache.druid.catalog.CatalogException; +import org.apache.druid.catalog.model.Columns; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.table.TableBuilder; +import org.apache.druid.catalog.storage.CatalogStorage; +import org.apache.druid.catalog.storage.CatalogTests; +import org.apache.druid.catalog.sync.CachedMetadataCatalog; +import org.apache.druid.catalog.sync.MetadataCatalog; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.CalciteIngestionDmlTest; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.fail; + +/** + * Test the use of catalog specs to drive MSQ ingestion. + */ +public class CatalogIngestionTest extends CalciteIngestionDmlTest +{ + @ClassRule + public static final TestDerbyConnector.DerbyConnectorRule DERBY_CONNECTION_RULE = + new TestDerbyConnector.DerbyConnectorRule(); + + /** + * Signature for the foo datasource after applying catalog metadata. + */ + private static final RowSignature FOO_SIGNATURE = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("extra1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m1", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("m2", ColumnType.DOUBLE) + .build(); + + private static CatalogStorage storage; + + @Override + public CatalogResolver createCatalogResolver() + { + CatalogTests.DbFixture dbFixture = new CatalogTests.DbFixture(DERBY_CONNECTION_RULE); + storage = dbFixture.storage; + MetadataCatalog catalog = new CachedMetadataCatalog( + storage, + storage.schemaRegistry(), + storage.jsonMapper() + ); + return new LiveCatalogResolver(catalog); + } + + @Override + public void finalizeTestFramework(SqlTestFramework sqlTestFramework) + { + super.finalizeTestFramework(sqlTestFramework); + buildTargetDatasources(); + buildFooDatasource(); + } + + private void buildTargetDatasources() + { + TableMetadata spec = TableBuilder.datasource("hourDs", "PT1H") + .build(); + createTableMetadata(spec); + } + + public void buildFooDatasource() + { + TableMetadata spec = TableBuilder.datasource("foo", "ALL") + .timeColumn() + .column("extra1", null) + .column("dim2", null) + .column("dim1", null) + .column("cnt", null) + .column("m1", Columns.DOUBLE) + .column("extra2", Columns.LONG) + .column("extra3", Columns.STRING) + .hiddenColumns(Arrays.asList("dim3", "unique_dim1")) + .sealed(true) + .build(); + createTableMetadata(spec); + } + + private void createTableMetadata(TableMetadata table) + { + try { + storage.tables().create(table); + } + catch (CatalogException e) { + fail(e.getMessage()); + } + } + + /** + * If the segment grain is given in the catalog then use this value is used. + */ + @Test + public void testInsertHourGrain() + { + testIngestionQuery() + .sql("INSERT INTO hourDs\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2") + .context(queryContextWithGranularity(Granularities.HOUR)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is given in the catalog, and also by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testInsertHourGrainWithDay() + { + testIngestionQuery() + .sql("INSERT INTO hourDs\n" + + "SELECT * FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index f8fc01b9369f..8319f0c48b9f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -162,6 +162,9 @@ import org.apache.druid.sql.calcite.export.TestExportStorageConnectorProvider; import org.apache.druid.sql.calcite.external.ExternalDataSource; import org.apache.druid.sql.calcite.external.ExternalOperatorConversion; +import org.apache.druid.sql.calcite.external.HttpOperatorConversion; +import org.apache.druid.sql.calcite.external.InlineOperatorConversion; +import org.apache.druid.sql.calcite.external.LocalOperatorConversion; import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.planner.PlannerConfig; @@ -351,6 +354,9 @@ public void configure(Binder binder) binder.install(new NestedDataModule()); NestedDataModule.registerHandlersAndSerde(); SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, HttpOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, InlineOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, LocalOperatorConversion.class); } @Override diff --git a/sql/src/main/codegen/includes/common.ftl b/sql/src/main/codegen/includes/common.ftl index 95138a7dbbf1..1edc542dc990 100644 --- a/sql/src/main/codegen/includes/common.ftl +++ b/sql/src/main/codegen/includes/common.ftl @@ -107,14 +107,14 @@ SqlTypeNameSpec DruidType() : } // Parses the supported file formats for export. -String FileFormat() : +SqlIdentifier FileFormat() : { SqlNode format; } { format = SimpleIdentifier() { - return format.toString(); + return (SqlIdentifier) format; } } diff --git a/sql/src/main/codegen/includes/insert.ftl b/sql/src/main/codegen/includes/insert.ftl index 81f5ed1253e3..0a949aec4334 100644 --- a/sql/src/main/codegen/includes/insert.ftl +++ b/sql/src/main/codegen/includes/insert.ftl @@ -35,7 +35,7 @@ SqlNode DruidSqlInsertEof() : final Pair p; SqlGranularityLiteral partitionedBy = null; SqlNodeList clusteredBy = null; - String exportFileFormat = null; + SqlIdentifier exportFileFormat = null; } { ( diff --git a/sql/src/main/codegen/includes/replace.ftl b/sql/src/main/codegen/includes/replace.ftl index 15edeaac12e3..d067bc450bb8 100644 --- a/sql/src/main/codegen/includes/replace.ftl +++ b/sql/src/main/codegen/includes/replace.ftl @@ -30,7 +30,7 @@ SqlNode DruidSqlReplaceEof() : SqlNodeList clusteredBy = null; final Pair p; SqlNode replaceTimeQuery = null; - String exportFileFormat = null; + SqlIdentifier exportFileFormat = null; } { { s = span(); } @@ -90,7 +90,7 @@ SqlNode DruidSqlReplaceEof() : { sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, destination, source, columnList); - return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, replaceTimeQuery, exportFileFormat); + return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, exportFileFormat, replaceTimeQuery); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java index a36ef9b6b96e..cce253b4c1ef 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.parser; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -43,7 +44,7 @@ public abstract class DruidSqlIngest extends SqlInsert @Nullable protected final SqlNodeList clusteredBy; @Nullable - private final String exportFileFormat; + private final SqlIdentifier exportFileFormat; public DruidSqlIngest( SqlParserPos pos, @@ -53,7 +54,7 @@ public DruidSqlIngest( SqlNodeList columnList, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat ) { super(pos, keywords, targetTable, source, columnList); @@ -76,7 +77,7 @@ public SqlNodeList getClusteredBy() } @Nullable - public String getExportFileFormat() + public SqlIdentifier getExportFileFormat() { return exportFileFormat; } @@ -88,6 +89,7 @@ public List getOperandList() .addAll(super.getOperandList()) .add(partitionedBy) .add(clusteredBy) + .add(exportFileFormat) .build(); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java index 7171a889ae0d..e283c9df9586 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java @@ -19,12 +19,14 @@ package org.apache.druid.sql.calcite.parser; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -39,13 +41,13 @@ public class DruidSqlInsert extends DruidSqlIngest public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity"; // This allows reusing super.unparse - public static final SqlOperator OPERATOR = SqlInsert.OPERATOR; + public static final SqlOperator OPERATOR = DruidSqlIngestOperator.INSERT_OPERATOR; public static DruidSqlInsert create( @Nonnull SqlInsert insertNode, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat ) { return new DruidSqlInsert( @@ -74,7 +76,7 @@ public DruidSqlInsert( SqlNodeList columnList, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat ) { super( @@ -110,7 +112,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) writer.newlineAndIndent(); if (getExportFileFormat() != null) { writer.keyword("AS"); - writer.print(getExportFileFormat()); + writer.print(getExportFileFormat().toString()); writer.newlineAndIndent(); } getSource().unparse(writer, 0, 0); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java index 86f78b4d6d75..45b677631d2a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java @@ -19,16 +19,16 @@ package org.apache.druid.sql.calcite.parser; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; -import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.util.ImmutableNullableList; +import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -43,7 +43,7 @@ public class DruidSqlReplace extends DruidSqlIngest { public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks"; - public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", SqlKind.OTHER); + public static final SqlOperator OPERATOR = DruidSqlIngestOperator.REPLACE_OPERATOR; private final SqlNode replaceTimeQuery; @@ -51,8 +51,8 @@ public static DruidSqlReplace create( @Nonnull SqlInsert insertNode, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable SqlNode replaceTimeQuery, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat, + @Nullable SqlNode replaceTimeQuery ) { return new DruidSqlReplace( @@ -63,8 +63,8 @@ public static DruidSqlReplace create( insertNode.getTargetColumnList(), partitionedBy, clusteredBy, - replaceTimeQuery, - exportFileFormat + exportFileFormat, + replaceTimeQuery ); } @@ -82,8 +82,8 @@ public DruidSqlReplace( SqlNodeList columnList, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable SqlNode replaceTimeQuery, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat, + @Nullable SqlNode replaceTimeQuery ) { super( @@ -137,7 +137,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) if (getExportFileFormat() != null) { writer.keyword("AS"); - writer.print(getExportFileFormat()); + writer.print(getExportFileFormat().toString()); writer.newlineAndIndent(); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java index 2cda011848b5..f2d0408b491d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java @@ -283,7 +283,7 @@ public final RelNode convert(SqlNode sql) public RelRoot rel(SqlNode sql) { ensure(CalcitePlanner.State.STATE_4_VALIDATED); - SqlNode validatedSqlNode = Objects.requireNonNull( + Objects.requireNonNull( this.validatedSqlNode, "validatedSqlNode is null. Need to call #validate() first" ); @@ -295,11 +295,11 @@ public RelRoot rel(SqlNode sql) final SqlToRelConverter.Config config = sqlToRelConverterConfig.withTrimUnusedFields(false); final SqlToRelConverter sqlToRelConverter = - new SqlToRelConverter(this, validator, + new DruidSqlToRelConverter(this, validator, createCatalogReader(), cluster, convertletTable, config ); RelRoot root = - sqlToRelConverter.convertQuery(validatedSqlNode, false, true); + sqlToRelConverter.convertQuery(sql, false, true); root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)); final RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster, null); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java new file mode 100644 index 000000000000..628e47686631 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.expression.AuthorizableOperator; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.parser.DruidSqlReplace; +import org.apache.druid.sql.calcite.parser.SqlGranularityLiteral; + +import java.util.HashSet; +import java.util.Set; + +public class DruidSqlIngestOperator extends SqlSpecialOperator implements AuthorizableOperator +{ + public static final SqlSpecialOperator INSERT_OPERATOR = + new DruidSqlInsertOperator(); + public static final SqlSpecialOperator REPLACE_OPERATOR = + new DruidSqlReplaceOperator(); + + public static class DruidSqlInsertOperator extends DruidSqlIngestOperator + { + public DruidSqlInsertOperator() + { + super("INSERT"); + } + + @Override + public SqlCall createCall( + SqlLiteral functionQualifier, + SqlParserPos pos, + SqlNode... operands + ) + { + return new DruidSqlInsert( + pos, + (SqlNodeList) operands[0], + operands[1], + operands[2], + (SqlNodeList) operands[3], + (SqlGranularityLiteral) operands[4], + (SqlNodeList) operands[5], + (SqlIdentifier) operands[6] + ); + } + } + + public static class DruidSqlReplaceOperator extends DruidSqlIngestOperator + { + public DruidSqlReplaceOperator() + { + super("REPLACE"); + } + + @Override + public SqlCall createCall( + SqlLiteral functionQualifier, + SqlParserPos pos, + SqlNode... operands + ) + { + return new DruidSqlReplace( + pos, + (SqlNodeList) operands[0], + operands[1], + operands[2], + (SqlNodeList) operands[3], + (SqlGranularityLiteral) operands[4], + (SqlNodeList) operands[5], + (SqlIdentifier) operands[6], + operands[7] + ); + } + } + + public DruidSqlIngestOperator(String name) + { + super(name, SqlKind.INSERT); + } + + @Override + public Set computeResources(SqlCall call, boolean inputSourceTypeSecurityEnabled) + { + // resource actions are computed in the respective ingest handlers. + return new HashSet<>(); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java new file mode 100644 index 000000000000..d5f68748337a --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable.ViewExpander; +import org.apache.calcite.prepare.Prepare.CatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql2rel.SqlRexConvertletTable; +import org.apache.calcite.sql2rel.SqlToRelConverter; + +public class DruidSqlToRelConverter extends SqlToRelConverter +{ + public DruidSqlToRelConverter( + final ViewExpander viewExpander, + final SqlValidator validator, + final CatalogReader catalogReader, + RelOptCluster cluster, + final SqlRexConvertletTable convertletTable, + final Config config + ) + { + super(viewExpander, validator, catalogReader, cluster, convertletTable, config); + } + + /** + * Convert a Druid {@code INSERT} or {@code REPLACE} statement. The code is the same + * as the normal conversion, except we don't actually create the final modify node. + * Druid has its own special way to handle inserts. (This should probably change in + * some future, but doing so requires changes in the SQL engine and MSQ, which is a bit + * invasive.) + */ + @Override + protected RelNode convertInsert(SqlInsert call) + { + // Get the target type: the column types we want to write into the target datasource. + final RelDataType targetRowType = validator.getValidatedNodeType(call); + assert targetRowType != null; + + // Convert the underlying SELECT. We pushed the CLUSTERED BY clause into the SELECT + // as its ORDER BY. We claim this is the top query because MSQ doesn't actually + // use the Calcite insert node. + RelNode sourceRel = convertQueryRecursive(call.getSource(), true, targetRowType).project(); + + // We omit the column mapping and insert node that Calcite normally provides. + // Presumably MSQ does these its own way. + return sourceRel; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index 390ddf96bafb..90aa21907ac8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -19,32 +19,72 @@ package org.apache.druid.sql.calcite.planner; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.prepare.BaseDruidSqlValidator; import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; import org.apache.calcite.runtime.CalciteContextException; import org.apache.calcite.runtime.CalciteException; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWindow; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.IdentifierNamespace; +import org.apache.calcite.sql.validate.SqlValidatorException; +import org.apache.calcite.sql.validate.SqlValidatorNamespace; import org.apache.calcite.sql.validate.SqlValidatorScope; +import org.apache.calcite.sql.validate.SqlValidatorTable; +import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; +import org.apache.druid.catalog.model.facade.DatasourceFacade; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.QueryContexts; +import org.apache.druid.sql.calcite.parser.DruidSqlIngest; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; +import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier; import org.apache.druid.sql.calcite.run.EngineFeature; +import org.apache.druid.sql.calcite.table.DatasourceTable; import org.checkerframework.checker.nullness.qual.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + /** * Druid extended SQL validator. (At present, it doesn't actually * have any extensions yet, but it will soon.) */ class DruidSqlValidator extends BaseDruidSqlValidator { + private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE); + + // Copied here from MSQE since that extension is not visible here. + public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment"; + + public interface ValidatorContext + { + Map queryContextMap(); + CatalogResolver catalog(); + String druidSchemaName(); + ObjectMapper jsonMapper(); + } + private final PlannerContext plannerContext; protected DruidSqlValidator( @@ -113,6 +153,302 @@ public void validateWindow(SqlNode windowOrId, SqlValidatorScope scope, @Nullabl super.validateWindow(windowOrId, scope, call); } + @Override + public void validateInsert(final SqlInsert insert) + { + final DruidSqlIngest ingestNode = (DruidSqlIngest) insert; + if (insert.isUpsert()) { + throw InvalidSqlInput.exception("UPSERT is not supported."); + } + + + // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported. + final String operationName = insert.getOperator().getName(); + if (insert.getTargetColumnList() != null) { + throw InvalidSqlInput.exception( + "Operation [%s] cannot be run with a target column list, given [%s (%s)]", + operationName, + ingestNode.getTargetTable(), ingestNode.getTargetColumnList() + ); + } + + // The target namespace is both the target table ID and the row type for that table. + final SqlValidatorNamespace targetNamespace = getNamespace(insert); + final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace; + // The target is a new or existing datasource. + final DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName); + + // An existing datasource may have metadata. + final DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata(); + + // Validate segment granularity, which depends on nothing else. + if (!(ingestNode.getTargetTable() instanceof ExternalDestinationSqlIdentifier)) { + Granularity effectiveGranularity = getEffectiveGranularity(operationName, ingestNode, tableMetadata); + // Note: though this is the validator, we cheat a bit and write the target + // granularity into the query context. Perhaps this step should be done + // during conversion, however, we've just worked out the granularity, so we + // do it here instead. + try { + plannerContext.queryContextMap().put( + DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, + plannerContext.getPlannerToolbox().jsonMapper().writeValueAsString(effectiveGranularity) + ); + } + catch (JsonProcessingException e) { + throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", effectiveGranularity); + } + } + + // The source must be a SELECT + final SqlNode source = insert.getSource(); + + // Validate the source statement. + // Because of the non-standard Druid semantics, we can't define the target type: we don't know + // the target columns yet, and we can't infer types when they must come from the SELECT. + // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT + // usually defines the target types, unless the catalog says otherwise. Since catalog entries + // are optional, we don't know the target type until we validate the SELECT. (Also, we won't + // know names and we match by name.) Thus, we'd have to validate (to know names and types) + // to get the target types, but we need the target types to validate. Catch-22. So, we punt. + final SqlValidatorScope scope; + if (source instanceof SqlSelect) { + final SqlSelect sqlSelect = (SqlSelect) source; + validateSelect(sqlSelect, unknownType); + scope = null; + } else { + scope = scopes.get(source); + validateQuery(source, scope, unknownType); + } + + final SqlValidatorNamespace sourceNamespace = namespaces.get(source); + final RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType(); + + // Determine the output (target) schema. + final RelDataType targetType = validateTargetType(scope, insertNs, insert, sourceType, tableMetadata); + + // Set the type for the INSERT/REPLACE node + setValidatedNodeType(insert, targetType); + + // Segment size + if (tableMetadata != null && !plannerContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) { + final Integer targetSegmentRows = tableMetadata.targetSegmentRows(); + if (targetSegmentRows != null) { + plannerContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows); + } + } + } + + /** + * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource, + * or insert into an existing one. If the target exists, it must be a datasource. If it + * does not exist, the target must be in the datasource schema, normally "druid". + */ + private DatasourceTable validateInsertTarget( + final SqlValidatorNamespace targetNamespace, + final IdentifierNamespace insertNs, + final String operationName + ) + { + // Get the target table ID + final SqlIdentifier destId = insertNs.getId(); + if (destId.names.isEmpty()) { + // I don't think this can happen, but include a branch for it just in case. + throw InvalidSqlInput.exception("Operation [%s] requires a target table", operationName); + } + + // Druid does not support 3+ part names. + final int n = destId.names.size(); + if (n > 2) { + throw InvalidSqlInput.exception("Druid does not support 3+ part names: [%s]", destId, operationName); + } + String tableName = destId.names.get(n - 1); + + // If this is a 2-part name, the first part must be the datasource schema. + if (n == 2 && !plannerContext.getPlannerToolbox().druidSchemaName().equals(destId.names.get(0))) { + throw InvalidSqlInput.exception( + "Table [%s] does not support operation [%s] because it is not a Druid datasource", + destId, + operationName + ); + } + try { + // Try to resolve the table. Will fail if this is an INSERT into a new table. + validateNamespace(targetNamespace, unknownType); + SqlValidatorTable target = insertNs.resolve().getTable(); + try { + return target.unwrap(DatasourceTable.class); + } + catch (Exception e) { + throw InvalidSqlInput.exception( + "Table [%s] does not support operation [%s] because it is not a Druid datasource", + destId, + operationName + ); + } + } + catch (CalciteContextException e) { + // Something failed. Let's make sure it was the table lookup. + // The check is kind of a hack, but its the best we can do given that Calcite + // didn't expect this non-SQL use case. + if (e.getCause() instanceof SqlValidatorException && e.getMessage() + .contains(StringUtils.format("Object '%s' not found", tableName))) { + // The catalog implementation may be "strict": and require that the target + // table already exists, rather than the default "lenient" mode that can + // create a new table. + if (plannerContext.getPlannerToolbox().catalogResolver().ingestRequiresExistingTable()) { + throw InvalidSqlInput.exception("Cannot %s into [%s] because it does not exist", operationName, destId); + } + // New table. Validate the shape of the name. + IdUtils.validateId("table", tableName); + return null; + } + throw e; + } + } + + /** + * Gets the effective PARTITIONED BY granularity. Resolves the granularity from the granularity specified on the + * ingest node, and on the table metadata as stored in catalog, if any. Mismatches between the 2 granularities are + * allowed if both are specified. The granularity specified on the ingest node is taken to be the effective + * granulartiy if specified. If no granulartiy is specified on either the ingestNode or in the table catalog entry + * for the table, an error is thrown. + * + * @param operationName The operation name + * @param ingestNode The ingest node. + * @param tableMetadata The table metadata as stored in the catalog, if any. + * + * @return The effective granularity + * @throws org.apache.druid.error.DruidException indicating invalud Sql if both the ingest node and table metadata + * for the respective target table have no PARTITIONED BY granularity defined. + */ + private Granularity getEffectiveGranularity( + final String operationName, + final DruidSqlIngest ingestNode, + @Nullable final DatasourceFacade tableMetadata + ) + { + Granularity effectiveGranularity = null; + final Granularity ingestionGranularity = ingestNode.getPartitionedBy() != null + ? ingestNode.getPartitionedBy().getGranularity() + : null; + if (ingestionGranularity != null) { + DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(ingestNode, ingestionGranularity); + effectiveGranularity = ingestionGranularity; + } else { + final Granularity definedGranularity = tableMetadata == null + ? null + : tableMetadata.segmentGranularity(); + if (definedGranularity != null) { + // Should already have been checked when creating the catalog entry + DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(null, definedGranularity); + effectiveGranularity = definedGranularity; + } + } + + if (effectiveGranularity == null) { + throw InvalidSqlInput.exception( + "Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", + operationName); + } + + return effectiveGranularity; + } + + /** + * Compute and validate the target type. In normal SQL, the engine would insert + * a project operator after the SELECT before the write to cast columns from the + * input type to the (compatible) defined output type. Druid doesn't work that way. + * In MSQ, the output the just is the input type. If the user wants to control the + * output type, then the user must manually insert any required CAST: Druid is not + * in the business of changing the type to suit the catalog. + *

+ * As a result, we first propagate column names and types using Druid rules: the + * output is exactly what SELECT says it is. We then apply restrictions from the + * catalog. If the table is strict, only column names from the catalog can be + * used. + */ + private RelDataType validateTargetType( + SqlValidatorScope scope, + final IdentifierNamespace insertNs, + SqlInsert insert, + RelRecordType sourceType, + DatasourceFacade tableMetadata + ) + { + final List sourceFields = sourceType.getFieldList(); + for (final RelDataTypeField sourceField : sourceFields) { + // Check that there are no unnamed columns in the insert. + if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) { + throw InvalidSqlInput.exception( + "Insertion requires columns to be named, but at least one of the columns was unnamed. This is usually " + + "the result of applying a function without having an AS clause, please ensure that all function calls" + + "are named with an AS clause as in \"func(X) as myColumn\"." + ); + } + } + if (tableMetadata == null) { + return sourceType; + } + final boolean isStrict = tableMetadata.isSealed(); + final List> fields = new ArrayList<>(); + for (RelDataTypeField sourceField : sourceFields) { + final String colName = sourceField.getName(); + final DatasourceFacade.ColumnFacade definedCol = tableMetadata.column(colName); + if (definedCol == null) { + if (isStrict) { + throw InvalidSqlInput.exception( + "Column [%s] is not defined in the target table [%s] strict schema", + colName, + insert.getTargetTable() + ); + } + + // Table is not strict: add a new column based on the SELECT column. + fields.add(Pair.of(colName, sourceField.getType())); + continue; + } + + // If the column name is defined, but no type is given then, use the + // column type from SELECT. + if (!definedCol.hasType()) { + fields.add(Pair.of(colName, sourceField.getType())); + continue; + } + + // Both the column name and type are provided. Use the name and type + // from the catalog. + // Note to future readers: this check is preliminary. It works for the + // simple column types and has not yet been extended to complex types, aggregates, + // types defined in extensions, etc. It may be that SQL + // has types that Druid cannot store. This may crop up with types defined in + // extensions which are not loaded. Those details are not known at the time + // of this code so we are not yet in a position to make the right decision. + // This is a task to be revisited when we have more information. + final String sqlTypeName = definedCol.sqlStorageType(); + if (sqlTypeName == null) { + // Don't know the storage type. Just skip this one: Druid types are + // fluid so let Druid sort out what to store. This is probably not the right + // answer, but should avoid problems until full type system support is completed. + fields.add(Pair.of(colName, sourceField.getType())); + continue; + } + RelDataType relType = typeFactory.createSqlType(SqlTypeName.get(sqlTypeName)); + fields.add(Pair.of( + colName, + typeFactory.createTypeWithNullability(relType, true) + )); + } + + // Perform the SQL-standard check: that the SELECT column can be + // converted to the target type. This check is retained to mimic SQL + // behavior, but doesn't do anything because we enforced exact type + // matches above. + final RelDataType targetType = typeFactory.createStructType(fields); + final SqlValidatorTable target = insertNs.resolve().getTable(); + checkTypeAssignment(scope, target, sourceType, targetType, insert); + return targetType; + } + private boolean isPrecedingOrFollowing(@Nullable SqlNode bound) { if (bound == null) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java index 0448a7245f82..4862f2821827 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java @@ -33,7 +33,6 @@ import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.tools.ValidationException; -import org.apache.calcite.util.Pair; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidSqlInput; @@ -55,28 +54,23 @@ import org.apache.druid.storage.ExportStorageProvider; import java.util.List; -import java.util.regex.Pattern; public abstract class IngestHandler extends QueryHandler { - private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE); - - protected final Granularity ingestionGranularity; + protected Granularity ingestionGranularity; protected IngestDestination targetDatasource; + private SqlNode validatedQueryNode; + IngestHandler( HandlerContext handlerContext, - DruidSqlIngest ingestNode, - SqlNode queryNode, SqlExplain explain ) { - super(handlerContext, queryNode, explain); - ingestionGranularity = ingestNode.getPartitionedBy() != null ? ingestNode.getPartitionedBy().getGranularity() : null; - handlerContext.hook().captureInsert(ingestNode); + super(handlerContext, explain); } - protected static SqlNode convertQuery(DruidSqlIngest sqlNode) + protected static SqlNode convertSourceQuery(DruidSqlIngest sqlNode) { SqlNode query = sqlNode.getSource(); @@ -98,6 +92,7 @@ protected static SqlNode convertQuery(DruidSqlIngest sqlNode) if (!query.isA(SqlKind.QUERY)) { throw InvalidSqlInput.exception("Unexpected SQL statement type [%s], expected it to be a QUERY", query.getKind()); } + return query; } @@ -123,7 +118,7 @@ private void validateExport() .build("Export statements do not support a PARTITIONED BY or CLUSTERED BY clause."); } - final String exportFileFormat = ingestNode().getExportFileFormat(); + final SqlIdentifier exportFileFormat = ingestNode().getExportFileFormat(); if (exportFileFormat == null) { throw InvalidSqlInput.exception( "Exporting rows into an EXTERN destination requires an AS clause to specify the format, but none was found.", @@ -132,7 +127,7 @@ private void validateExport() } else { handlerContext.plannerContext().queryContextMap().put( DruidSqlIngest.SQL_EXPORT_FILE_FORMAT, - exportFileFormat + exportFileFormat.toString() ); } } @@ -143,13 +138,6 @@ public void validate() if (ingestNode().getTargetTable() instanceof ExternalDestinationSqlIdentifier) { validateExport(); } else { - if (ingestNode().getPartitionedBy() == null) { - throw InvalidSqlInput.exception( - "Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", - operationName() - ); - } - if (ingestNode().getExportFileFormat() != null) { throw InvalidSqlInput.exception( "The AS clause should only be specified while exporting rows into an EXTERN destination.", @@ -158,19 +146,6 @@ public void validate() } } - try { - PlannerContext plannerContext = handlerContext.plannerContext(); - if (ingestionGranularity != null) { - plannerContext.queryContextMap().put( - DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, - plannerContext.getJsonMapper().writeValueAsString(ingestionGranularity) - ); - } - } - catch (JsonProcessingException e) { - throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", ingestionGranularity); - } - super.validate(); // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes // the number of rows inserted to be limited which is likely to be confusing and unintended. if (handlerContext.queryContextMap().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) { @@ -180,9 +155,31 @@ public void validate() operationName() ); } + DruidSqlIngest ingestNode = ingestNode(); + DruidSqlIngest validatedNode = (DruidSqlIngest) validate(ingestNode); + validatedQueryNode = validatedNode.getSource(); + // This context key is set during validation in + // org.apache.druid.sql.calcite.planner.DruidSqlValidator.validateInsert. + String effectiveGranularity = (String) handlerContext.queryContextMap() + .get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY); + try { + ingestionGranularity = effectiveGranularity != null + ? handlerContext.jsonMapper().readValue(effectiveGranularity, Granularity.class) + : null; + } + catch (JsonProcessingException e) { + // this should never happen, since the granularity value is validated before being written to contextMap. + throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", effectiveGranularity); + } targetDatasource = validateAndGetDataSourceForIngest(); } + @Override + protected SqlNode validatedQueryNode() + { + return validatedQueryNode; + } + @Override protected RelDataType returnedRowType() { @@ -202,27 +199,11 @@ protected RelDataType returnedRowType() private IngestDestination validateAndGetDataSourceForIngest() { final SqlInsert insert = ingestNode(); - if (insert.isUpsert()) { - throw InvalidSqlInput.exception("UPSERT is not supported."); - } - - if (insert.getTargetColumnList() != null) { - throw InvalidSqlInput.exception( - "Operation [%s] cannot be run with a target column list, given [%s (%s)]", - operationName(), - insert.getTargetTable(), insert.getTargetColumnList() - ); - } final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable(); final IngestDestination dataSource; - if (tableIdentifier.names.isEmpty()) { - // I don't think this can happen, but include a branch for it just in case. - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.DEFENSIVE) - .build("Operation [%s] requires a target table", operationName()); - } else if (tableIdentifier instanceof ExternalDestinationSqlIdentifier) { + if (tableIdentifier instanceof ExternalDestinationSqlIdentifier) { ExternalDestinationSqlIdentifier externalDestination = ((ExternalDestinationSqlIdentifier) tableIdentifier); ExportStorageProvider storageProvider = externalDestination.toExportStorageProvider(handlerContext.jsonMapper()); dataSource = new ExportDestination(storageProvider); @@ -264,7 +245,6 @@ protected PlannerResult planForDruid() throws ValidationException @Override protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException { - validateColumnsForIngestion(rootQueryRel); return handlerContext.engine().buildQueryMakerForInsert( targetDatasource, rootQueryRel, @@ -272,20 +252,6 @@ protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws Validati ); } - private void validateColumnsForIngestion(RelRoot rootQueryRel) - { - // Check that there are no unnamed columns in the insert. - for (Pair field : rootQueryRel.fields) { - if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) { - throw InvalidSqlInput.exception( - "Insertion requires columns to be named, but at least one of the columns was unnamed. This is usually " - + "the result of applying a function without having an AS clause, please ensure that all function calls" - + "are named with an AS clause as in \"func(X) as myColumn\"." - ); - } - } - } - /** * Handler for the INSERT statement. */ @@ -299,13 +265,24 @@ public InsertHandler( SqlExplain explain ) { - super( - handlerContext, - sqlNode, - convertQuery(sqlNode), - explain - ); - this.sqlNode = sqlNode; + super(handlerContext, explain); + this.sqlNode = convertQuery(sqlNode); + handlerContext.hook().captureInsert(sqlNode); + } + + protected static DruidSqlInsert convertQuery(DruidSqlIngest sqlNode) + { + SqlNode query = convertSourceQuery(sqlNode); + + return DruidSqlInsert.create(new SqlInsert( + sqlNode.getParserPosition(), + (SqlNodeList) sqlNode.getOperandList().get(0), + sqlNode.getOperandList().get(1), + query, + (SqlNodeList) sqlNode.getOperandList().get(3)), + sqlNode.getPartitionedBy(), + sqlNode.getClusteredBy(), + sqlNode.getExportFileFormat()); } @Override @@ -355,11 +332,29 @@ public ReplaceHandler( { super( handlerContext, - sqlNode, - convertQuery(sqlNode), explain ); - this.sqlNode = sqlNode; + this.sqlNode = convertQuery(sqlNode); + handlerContext.hook().captureInsert(sqlNode); + } + + protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode) + { + SqlNode query = convertSourceQuery(sqlNode); + + return DruidSqlReplace.create( + new SqlInsert( + sqlNode.getParserPosition(), + (SqlNodeList) sqlNode.getOperandList().get(0), + sqlNode.getOperandList().get(1), + query, + (SqlNodeList) sqlNode.getOperandList().get(3) + ), + sqlNode.getPartitionedBy(), + sqlNode.getClusteredBy(), + sqlNode.getExportFileFormat(), + sqlNode.getReplaceTimeQuery() + ); } @Override @@ -390,12 +385,12 @@ public void validate() ); } + super.validate(); List replaceIntervalsList = DruidSqlParserUtils.validateQueryAndConvertToIntervals( replaceTimeQuery, ingestionGranularity, handlerContext.timeZone() ); - super.validate(); if (replaceIntervalsList != null) { replaceIntervals = String.join(",", replaceIntervalsList); handlerContext.queryContextMap().put( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 57d4e08b0719..3e7f58ec0674 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -93,27 +93,24 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand { static final EmittingLogger log = new EmittingLogger(QueryHandler.class); - protected SqlNode queryNode; protected SqlExplain explain; - protected SqlNode validatedQueryNode; private boolean isPrepared; protected RelRoot rootQueryRel; private PrepareResult prepareResult; protected RexBuilder rexBuilder; - public QueryHandler(SqlStatementHandler.HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain) + public QueryHandler(HandlerContext handlerContext, SqlExplain explain) { super(handlerContext); - this.queryNode = sqlNode; this.explain = explain; } - @Override - public void validate() + protected SqlNode validate(SqlNode root) { CalcitePlanner planner = handlerContext.planner(); + SqlNode validatedQueryNode; try { - validatedQueryNode = planner.validate(rewriteParameters()); + validatedQueryNode = planner.validate(rewriteParameters(root)); } catch (ValidationException e) { throw DruidPlanner.translateException(e); @@ -126,9 +123,10 @@ public void validate() ); validatedQueryNode.accept(resourceCollectorShuttle); resourceActions = resourceCollectorShuttle.getResourceActions(); + return validatedQueryNode; } - private SqlNode rewriteParameters() + private SqlNode rewriteParameters(SqlNode original) { // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral} @@ -140,9 +138,9 @@ private SqlNode rewriteParameters() // contains parameters, but no values were provided. PlannerContext plannerContext = handlerContext.plannerContext(); if (plannerContext.getParameters().isEmpty()) { - return queryNode; + return original; } else { - return queryNode.accept(new SqlParameterizerShuttle(plannerContext)); + return original.accept(new SqlParameterizerShuttle(plannerContext)); } } @@ -153,6 +151,7 @@ public void prepare() return; } isPrepared = true; + SqlNode validatedQueryNode = validatedQueryNode(); rootQueryRel = handlerContext.planner().rel(validatedQueryNode); handlerContext.hook().captureQueryRel(rootQueryRel); final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory(); @@ -177,6 +176,8 @@ public PrepareResult prepareResult() return prepareResult; } + protected abstract SqlNode validatedQueryNode(); + protected abstract RelDataType returnedRowType(); private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory) @@ -690,13 +691,17 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e public static class SelectHandler extends QueryHandler { + private final SqlNode queryNode; + private SqlNode validatedQueryNode; + public SelectHandler( HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain ) { - super(handlerContext, sqlNode, explain); + super(handlerContext, explain); + this.queryNode = sqlNode; } @Override @@ -705,7 +710,13 @@ public void validate() if (!handlerContext.plannerContext().featureAvailable(EngineFeature.CAN_SELECT)) { throw InvalidSqlInput.exception("Cannot execute SELECT with SQL engine [%s]", handlerContext.engine().name()); } - super.validate(); + validatedQueryNode = validate(queryNode); + } + + @Override + protected SqlNode validatedQueryNode() + { + return validatedQueryNode; } @Override diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java index cc4b2a0fec49..4a97367fcd19 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import org.apache.calcite.avatica.SqlType; import org.apache.druid.error.DruidException; import org.apache.druid.guice.DruidInjectorBuilder; import org.apache.druid.initialization.DruidModule; @@ -37,6 +38,7 @@ import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.destination.ExportDestination; +import org.apache.druid.sql.http.SqlParameter; import org.apache.druid.storage.StorageConfig; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.local.LocalFileExportStorageProvider; @@ -46,6 +48,7 @@ import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; +import java.util.Collections; import java.util.List; public class CalciteExportTest extends CalciteIngestionDmlTest @@ -176,6 +179,59 @@ public void testInsertIntoExtern() .verify(); } + + @Test + public void testInsertIntoExternParameterized() + { + testIngestionQuery() + .sql(StringUtils.format("INSERT INTO EXTERN(%s()) " + + "AS CSV " + + "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME)) + .parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val"))) + .expectQuery( + Druids.newScanQueryBuilder() + .dataSource( + "foo" + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters(equality("dim2", "val", ColumnType.STRING)) + .columns("dim2") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .build() + ) + .expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME)) + .expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build()) + .verify(); + } + + // Disabled until replace supports external destinations. To be enabled after that point. + @Test + @Ignore + public void testReplaceIntoExternParameterized() + { + testIngestionQuery() + .sql(StringUtils.format("REPLACE INTO EXTERN(%s()) " + + "AS CSV " + + "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME)) + .parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val"))) + .expectQuery( + Druids.newScanQueryBuilder() + .dataSource( + "foo" + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters(equality("dim2", "val", ColumnType.STRING)) + .columns("dim2") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .build() + ) + .expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME)) + .expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build()) + .verify(); + } + @Test public void testExportWithoutFormat() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index 515796948fa1..eded1b7fb776 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1679,7 +1679,6 @@ public void testErrorWithUnableToConstructColumnSignatureWithExtern() + "partitioned by DAY\n" + "clustered by channel"; HashMap context = new HashMap<>(DEFAULT_CONTEXT); - context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); testIngestionQuery().context(context).sql(sqlString) .expectValidationError( new DruidExceptionMatcher( @@ -1708,7 +1707,6 @@ public void testErrorWhenBothRowSignatureAndExtendsProvidedToExtern() + "partitioned by DAY\n" + "clustered by channel"; HashMap context = new HashMap<>(DEFAULT_CONTEXT); - context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); testIngestionQuery().context(context).sql(sqlString) .expectValidationError( new DruidExceptionMatcher( @@ -1736,7 +1734,6 @@ public void testErrorWhenNoneOfRowSignatureAndExtendsProvidedToExtern() + "partitioned by DAY\n" + "clustered by channel"; HashMap context = new HashMap<>(DEFAULT_CONTEXT); - context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); testIngestionQuery().context(context).sql(sqlString) .expectValidationError( new DruidExceptionMatcher( @@ -1765,7 +1762,6 @@ public void testErrorWhenInputSourceInvalid() + "partitioned by DAY\n" + "clustered by channel"; HashMap context = new HashMap<>(DEFAULT_CONTEXT); - context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); testIngestionQuery().context(context).sql(sqlString) .expectValidationError( new DruidExceptionMatcher(