From fafcc76a800ff3992c237af75ba983551138db67 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 27 Mar 2024 15:48:02 -0400 Subject: [PATCH 1/9] * fix --- .../druid/sql/calcite/table/RowSignatures.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java index 0234d6b5319f..e02cd6458945 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java @@ -41,6 +41,8 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.sql.calcite.expression.SimpleExtraction; import org.apache.druid.sql.calcite.planner.Calcites; @@ -222,7 +224,17 @@ public ComplexSqlType( ) { super(typeName, isNullable, null); - this.columnType = columnType; + // homogenize complex type names to common name + final ComplexMetricSerde serde = columnType.getComplexTypeName() != null + ? + ComplexMetrics.getSerdeForType(columnType.getComplexTypeName()) + : null; + + if (serde != null) { + this.columnType = ColumnType.ofComplex(serde.getTypeName()); + } else { + this.columnType = columnType; + } this.computeDigest(); } From fe2c4070b8eb5ebfcbe55d33005372ce7cf35e20 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 28 Mar 2024 14:50:35 -0400 Subject: [PATCH 2/9] * fix --- .../sql/TDigestGenerateSketchSqlAggregator.java | 5 ++--- .../aggregation/datasketches/hll/HllSketchModule.java | 2 ++ ...etchEstimateWithErrorBoundsOperatorConversion.java | 5 +++-- .../hll/sql/HllSketchObjectSqlAggregator.java | 5 +++-- .../sql/DoublesSketchObjectSqlAggregator.java | 5 +++-- ...etchEstimateWithErrorBoundsOperatorConversion.java | 5 +++-- .../bloom/sql/BloomFilterSqlAggregator.java | 5 ++--- .../sql/calcite/expression/OperatorConversions.java | 11 +++++++++++ 8 files changed, 29 insertions(+), 14 deletions(-) diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java index cb9b95423a67..ede0007fd9fe 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java @@ -25,9 +25,7 @@ import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Optionality; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -39,6 +37,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.DefaultOperandTypeChecker; import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.InputAccessor; import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; @@ -140,7 +139,7 @@ private static class TDigestGenerateSketchSqlAggFunction extends SqlAggFunction NAME, null, SqlKind.OTHER_FUNCTION, - ReturnTypes.explicit(SqlTypeName.OTHER), + OperatorConversions.complexReturnTypeWithNullability(TDigestSketchAggregatorFactory.TYPE, false), null, // Validation for signatures like 'TDIGEST_GENERATE_SKETCH(column)' and // 'TDIGEST_GENERATE_SKETCH(column, compression)' diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchModule.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchModule.java index ea2f11ca785b..a44f0cee6633 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchModule.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchModule.java @@ -35,6 +35,7 @@ import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchObjectSqlAggregator; import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchSetUnionOperatorConversion; import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchToStringOperatorConversion; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.sql.guice.SqlBindings; @@ -48,6 +49,7 @@ public class HllSketchModule implements DruidModule { public static final String TYPE_NAME = "HLLSketch"; // common type name to be associated with segment data + public static final ColumnType TYPE = ColumnType.ofComplex(TYPE_NAME); public static final String BUILD_TYPE_NAME = "HLLSketchBuild"; public static final String MERGE_TYPE_NAME = "HLLSketchMerge"; public static final String TO_STRING_TYPE_NAME = "HLLSketchToString"; diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java index cc89d4c7a560..9909a2175642 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java @@ -26,9 +26,9 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimateWithBoundsPostAggregator; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.expression.DruidExpression; @@ -47,7 +47,8 @@ public class HllSketchEstimateWithErrorBoundsOperatorConversion implements SqlOp .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) .operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER) .requiredOperandCount(1) - .returnTypeNonNull(SqlTypeName.OTHER) + .returnTypeInference( + OperatorConversions.complexReturnTypeWithNullability(HllSketchModule.TYPE, false)) .build(); @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java index 9d8ade636f1b..375cb8c32a61 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java @@ -23,9 +23,9 @@ import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.OperatorConversions; @@ -42,7 +42,8 @@ public class HllSketchObjectSqlAggregator extends HllSketchBaseSqlAggregator imp .operandTypeInference(InferTypes.VARCHAR_1024) .requiredOperandCount(1) .literalOperands(1, 2) - .returnTypeNonNull(SqlTypeName.OTHER) + .returnTypeInference( + OperatorConversions.complexReturnTypeWithNullability(HllSketchModule.TYPE, false)) .functionCategory(SqlFunctionCategory.USER_DEFINED_FUNCTION) .build(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java index 8331ab720640..30318ae1732b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java @@ -27,10 +27,10 @@ import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.sql.calcite.aggregation.Aggregation; @@ -52,7 +52,8 @@ public class DoublesSketchObjectSqlAggregator implements SqlAggregator OperatorConversions.aggregatorBuilder(NAME) .operandNames("column", "k") .operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.EXACT_NUMERIC) - .returnTypeNonNull(SqlTypeName.OTHER) + .returnTypeInference( + OperatorConversions.complexReturnTypeWithNullability(HllSketchModule.TYPE, false)) .requiredOperandCount(1) .literalOperands(1) .functionCategory(SqlFunctionCategory.NUMERIC) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java index 459a12867bcc..df53f347cdb6 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java @@ -26,10 +26,10 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator; +import org.apache.druid.query.aggregation.datasketches.theta.SketchModule; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.OperatorConversions; @@ -46,7 +46,8 @@ public class ThetaSketchEstimateWithErrorBoundsOperatorConversion implements Sql private static final SqlFunction SQL_FUNCTION = OperatorConversions .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) .operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER) - .returnTypeNonNull(SqlTypeName.OTHER) + .returnTypeInference( + OperatorConversions.complexReturnTypeWithNullability(SketchModule.BUILD_TYPE, false)) .build(); @Override diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java index cb73d94ef7ea..9bda5dc4e9a7 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java @@ -25,9 +25,7 @@ import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Optionality; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -41,6 +39,7 @@ import org.apache.druid.sql.calcite.expression.DefaultOperandTypeChecker; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.InputAccessor; @@ -175,7 +174,7 @@ private static class BloomFilterSqlAggFunction extends SqlAggFunction NAME, null, SqlKind.OTHER_FUNCTION, - ReturnTypes.explicit(SqlTypeName.OTHER), + OperatorConversions.complexReturnTypeWithNullability(BloomFilterAggregatorFactory.TYPE, false), null, // Allow signatures like 'BLOOM_FILTER(column, maxNumEntries)' DefaultOperandTypeChecker diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java index 450d62082406..c6bbb59e27f2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java @@ -52,11 +52,13 @@ import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.DruidTypeSystem; import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; import java.util.Arrays; @@ -779,4 +781,13 @@ public static DirectOperatorConversion druidUnaryDoubleFn(String sqlOperator, St druidFunctionName ); } + + public static SqlReturnTypeInference complexReturnTypeWithNullability(ColumnType columnType, boolean nullable) + { + return opBinding -> RowSignatures.makeComplexType( + opBinding.getTypeFactory(), + columnType, + nullable + ); + } } From 357e6a7c89a3ea0b4b3e3f9f332edae7cd34d8fb Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 29 Mar 2024 12:58:50 -0400 Subject: [PATCH 3/9] * address review comments --- .../sql/TDigestGenerateSketchSqlAggregator.java | 4 ++-- ...etchEstimateWithErrorBoundsOperatorConversion.java | 5 ++--- .../hll/sql/HllSketchObjectSqlAggregator.java | 4 +++- .../sql/DoublesSketchObjectSqlAggregator.java | 6 ++++-- .../aggregation/datasketches/theta/SketchModule.java | 1 + ...etchEstimateWithErrorBoundsOperatorConversion.java | 4 +++- .../bloom/sql/BloomFilterSqlAggregator.java | 3 +-- .../sql/calcite/expression/OperatorConversions.java | 11 ----------- .../apache/druid/sql/calcite/planner/Calcites.java | 9 +++++++++ 9 files changed, 25 insertions(+), 22 deletions(-) diff --git a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java index ede0007fd9fe..1777ce5c5449 100644 --- a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java +++ b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java @@ -37,7 +37,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.DefaultOperandTypeChecker; import org.apache.druid.sql.calcite.expression.DruidExpression; -import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.InputAccessor; import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; @@ -139,7 +139,7 @@ private static class TDigestGenerateSketchSqlAggFunction extends SqlAggFunction NAME, null, SqlKind.OTHER_FUNCTION, - OperatorConversions.complexReturnTypeWithNullability(TDigestSketchAggregatorFactory.TYPE, false), + Calcites.complexReturnTypeWithNullability(TDigestSketchAggregatorFactory.TYPE, false), null, // Validation for signatures like 'TDIGEST_GENERATE_SKETCH(column)' and // 'TDIGEST_GENERATE_SKETCH(column, compression)' diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java index 9909a2175642..0c2e8e0d6681 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java @@ -26,9 +26,9 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; -import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimateWithBoundsPostAggregator; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.expression.DruidExpression; @@ -47,8 +47,7 @@ public class HllSketchEstimateWithErrorBoundsOperatorConversion implements SqlOp .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) .operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER) .requiredOperandCount(1) - .returnTypeInference( - OperatorConversions.complexReturnTypeWithNullability(HllSketchModule.TYPE, false)) + .returnTypeNullableArrayWithNullableElements(SqlTypeName.DOUBLE) .build(); @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java index 375cb8c32a61..0e466bcaf0a0 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java @@ -29,6 +29,7 @@ import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.planner.Calcites; import java.util.Collections; @@ -43,7 +44,8 @@ public class HllSketchObjectSqlAggregator extends HllSketchBaseSqlAggregator imp .requiredOperandCount(1) .literalOperands(1, 2) .returnTypeInference( - OperatorConversions.complexReturnTypeWithNullability(HllSketchModule.TYPE, false)) + Calcites.complexReturnTypeWithNullability(HllSketchModule.TYPE, false) + ) .functionCategory(SqlFunctionCategory.USER_DEFINED_FUNCTION) .build(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java index 30318ae1732b..15b15b0dc21e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java @@ -30,14 +30,15 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; -import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.Aggregations; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.InputAccessor; import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; @@ -53,7 +54,8 @@ public class DoublesSketchObjectSqlAggregator implements SqlAggregator .operandNames("column", "k") .operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.EXACT_NUMERIC) .returnTypeInference( - OperatorConversions.complexReturnTypeWithNullability(HllSketchModule.TYPE, false)) + Calcites.complexReturnTypeWithNullability(DoublesSketchModule.TYPE, false) + ) .requiredOperandCount(1) .literalOperands(1) .functionCategory(SqlFunctionCategory.NUMERIC) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchModule.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchModule.java index 979f3f2579f9..5a67aa3d2f17 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchModule.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchModule.java @@ -47,6 +47,7 @@ public class SketchModule implements DruidModule public static final String THETA_SKETCH_MERGE_AGG = "thetaSketchMerge"; public static final String THETA_SKETCH_BUILD_AGG = "thetaSketchBuild"; + public static final ColumnType THETA_SKETCH_TYPE = ColumnType.ofComplex(THETA_SKETCH); public static final ColumnType BUILD_TYPE = ColumnType.ofComplex(THETA_SKETCH_BUILD_AGG); public static final ColumnType MERGE_TYPE = ColumnType.ofComplex(THETA_SKETCH_MERGE_AGG); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java index df53f347cdb6..6b07f98d6687 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java @@ -35,6 +35,7 @@ import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; @@ -47,7 +48,8 @@ public class ThetaSketchEstimateWithErrorBoundsOperatorConversion implements Sql .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) .operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER) .returnTypeInference( - OperatorConversions.complexReturnTypeWithNullability(SketchModule.BUILD_TYPE, false)) + Calcites.complexReturnTypeWithNullability(SketchModule.THETA_SKETCH_TYPE, false) + ) .build(); @Override diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java index 9bda5dc4e9a7..5beb6c642326 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java @@ -39,7 +39,6 @@ import org.apache.druid.sql.calcite.expression.DefaultOperandTypeChecker; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.Expressions; -import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.InputAccessor; @@ -174,7 +173,7 @@ private static class BloomFilterSqlAggFunction extends SqlAggFunction NAME, null, SqlKind.OTHER_FUNCTION, - OperatorConversions.complexReturnTypeWithNullability(BloomFilterAggregatorFactory.TYPE, false), + Calcites.complexReturnTypeWithNullability(BloomFilterAggregatorFactory.TYPE, false), null, // Allow signatures like 'BLOOM_FILTER(column, maxNumEntries)' DefaultOperandTypeChecker diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java index c6bbb59e27f2..450d62082406 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java @@ -52,13 +52,11 @@ import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; -import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.DruidTypeSystem; import org.apache.druid.sql.calcite.planner.PlannerContext; -import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; import java.util.Arrays; @@ -781,13 +779,4 @@ public static DirectOperatorConversion druidUnaryDoubleFn(String sqlOperator, St druidFunctionName ); } - - public static SqlReturnTypeInference complexReturnTypeWithNullability(ColumnType columnType, boolean nullable) - { - return opBinding -> RowSignatures.makeComplexType( - opBinding.getTypeFactory(), - columnType, - nullable - ); - } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java index b56a6d5bed50..64929c6445e1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java @@ -100,6 +100,15 @@ public class Calcites public static final SqlReturnTypeInference ARG1_NULLABLE_ARRAY_RETURN_TYPE_INFERENCE = new Arg1NullableArrayTypeInference(); + public static SqlReturnTypeInference complexReturnTypeWithNullability(ColumnType columnType, boolean nullable) + { + return opBinding -> RowSignatures.makeComplexType( + opBinding.getTypeFactory(), + columnType, + nullable + ); + } + private Calcites() { // No instantiation. From fd6cb248789568c5877ee670c5984df275fa9bd6 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 1 Apr 2024 16:45:07 -0400 Subject: [PATCH 4/9] * fix --- .../druid/catalog/sql/CatalogInsertTest.java | 8 - .../calcite/planner/DruidSqlValidator.java | 39 ++-- .../CalciteCatalogIngestionDmlTest.java | 4 +- .../sql/calcite/CalciteCatalogInsertTest.java | 203 ++++++++++++++++++ .../calcite/CalciteCatalogReplaceTest.java | 203 ++++++++++++++++++ 5 files changed, 435 insertions(+), 22 deletions(-) diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java index d4a97e666ed4..2a6aae755e7b 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java @@ -92,14 +92,6 @@ public void buildDatasources() createTableMetadata(tableBuilder.build()); }); - DatasourceFacade catalogMetadata = - ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata(); - TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString()); - catalogMetadata.columnFacades().forEach( - columnFacade -> { - tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType()); - } - ); } private void createTableMetadata(TableMetadata table) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index 7b7c1b81c4dd..2531af065cad 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -41,7 +41,6 @@ import org.apache.calcite.sql.SqlWindow; import org.apache.calcite.sql.SqlWith; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.sql.validate.IdentifierNamespace; @@ -54,18 +53,19 @@ import org.apache.calcite.util.Static; import org.apache.calcite.util.Util; import org.apache.druid.catalog.model.facade.DatasourceFacade; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.Types; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier; import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.sql.calcite.table.RowSignatures; import org.checkerframework.checker.nullness.qual.Nullable; import java.util.ArrayList; @@ -474,18 +474,21 @@ private RelDataType validateTargetType( continue; } SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType()); - RelDataType relType = typeFactory.createSqlType(sqlTypeName); - if (NullHandling.replaceWithDefault() && !SqlTypeFamily.STRING.contains(relType)) { - fields.add(Pair.of( - colName, - relType - )); + RelDataType relType; + if (sqlTypeName != null) { + relType = typeFactory.createSqlType(sqlTypeName); } else { - fields.add(Pair.of( - colName, - typeFactory.createTypeWithNullability(relType, true) - )); + relType = RowSignatures.columnTypeToRelDataType( + typeFactory, + ColumnType.fromString(definedCol.sqlStorageType()), + sourceField.getType().isNullable() + ); } + + fields.add(Pair.of( + colName, + typeFactory.createTypeWithNullability(relType, sourceField.getType().isNullable()) + )); } // Perform the SQL-standard check: that the SELECT column can be @@ -516,7 +519,17 @@ protected void checkTypeAssignment( ColumnType sourceFieldColumnType = Calcites.getColumnTypeForRelDataType(sourceFielRelDataType); ColumnType targetFieldColumnType = Calcites.getColumnTypeForRelDataType(targetFieldRelDataType); - if (targetFieldColumnType != ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType)) { + boolean incompatible; + try { + incompatible = !Objects.equals( + targetFieldColumnType, + ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType) + ); + } + catch (Types.IncompatibleTypeException e) { + incompatible = true; + } + if (incompatible) { SqlNode node = getNthExpr(query, i, sourceCount); String targetTypeString; String sourceTypeString; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java index 2d3fb5d7f114..6c91941ae971 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java @@ -31,6 +31,7 @@ import org.apache.druid.catalog.model.table.DatasourceDefn; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.table.DatasourceTable; @@ -152,7 +153,8 @@ public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest new ColumnSpec("dim3", Columns.STRING, null), new ColumnSpec("cnt", Columns.LONG, null), new ColumnSpec("m1", Columns.FLOAT, null), - new ColumnSpec("m2", Columns.DOUBLE, null) + new ColumnSpec("m2", Columns.DOUBLE, null), + new ColumnSpec("unique_dim1", HyperUniquesAggregatorFactory.TYPE.asTypeString(), null) ) ), MAPPER diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java index af45896011c6..c780a040d042 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java @@ -24,6 +24,9 @@ import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.external.ExternalDataSource; @@ -197,6 +200,96 @@ public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable() .verify(); } + /** + * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testGroupByInsertAddNonDefinedColumnIntoNonSealedCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("extra4_complex", ColumnType.LONG) + .build(); + testIngestionQuery() + .sql("INSERT INTO foo\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3,\n" + + " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "GROUP BY 1,2,3,4,5,6\n" + + "PARTITIONED BY ALL TIME" + ) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + GroupByQuery.builder() + .setDataSource(externalDataSource) + .setGranularity(Granularities.ALL) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setVirtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) + ) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), + new DefaultDimensionSpec("b", "d1", ColumnType.STRING), + new DefaultDimensionSpec("c", "d3", ColumnType.LONG), + new DefaultDimensionSpec("d", "d4", ColumnType.LONG), + new DefaultDimensionSpec("e", "d5", ColumnType.STRING) + ) + ) + .setAggregatorSpecs( + new CardinalityAggregatorFactory( + "a0", + null, + ImmutableList.of( + new DefaultDimensionSpec( + "c", + "c", + ColumnType.LONG + ) + ), + false, + true + ) + ) + .setPostAggregatorSpecs( + expressionPostAgg("p0", "1", ColumnType.LONG), + expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + /** * Adding a new column during ingestion that is not defined in a sealed table should fail with * proper validation error. @@ -291,6 +384,100 @@ public void testInsertWithSourceIntoCatalogTable() .verify(); } + /** + * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testGroupByInsertWithSourceIntoCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("extra4_complex", ColumnType.LONG) + .build(); + testIngestionQuery() + .sql("INSERT INTO foo\n" + + "WITH \"ext\" AS (\n" + + " SELECT *\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + ")\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3,\n" + + " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + + "FROM \"ext\"\n" + + "GROUP BY 1,2,3,4,5,6\n" + + "PARTITIONED BY ALL TIME" + ) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + GroupByQuery.builder() + .setDataSource(externalDataSource) + .setGranularity(Granularities.ALL) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setVirtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) + ) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), + new DefaultDimensionSpec("b", "d1", ColumnType.STRING), + new DefaultDimensionSpec("c", "d3", ColumnType.LONG), + new DefaultDimensionSpec("d", "d4", ColumnType.LONG), + new DefaultDimensionSpec("e", "d5", ColumnType.STRING) + ) + ) + .setAggregatorSpecs( + new CardinalityAggregatorFactory( + "a0", + null, + ImmutableList.of( + new DefaultDimensionSpec( + "c", + "c", + ColumnType.LONG + ) + ), + false, + true + ) + ) + .setPostAggregatorSpecs( + expressionPostAgg("p0", "1", ColumnType.LONG), + expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + @Test public void testInsertIntoExistingStrictNoDefinedSchema() { @@ -317,4 +504,20 @@ public void testInsertIntoExistingWithIncompatibleTypeAssignment() "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") .verify(); } + + @Test + public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment() + { + testIngestionQuery() + .sql("INSERT INTO foo\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS unique_dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Cannot assign to target field 'unique_dim1' of type COMPLEX from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])") + .verify(); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java index f4c6a908ca7d..0ba5d345d8ff 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java @@ -24,6 +24,9 @@ import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.external.ExternalDataSource; @@ -192,6 +195,96 @@ public void testReplaceAddNonDefinedColumnIntoNonSealedCatalogTable() .verify(); } + /** + * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testGroupByReplaceAddNonDefinedColumnIntoNonSealedCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("extra4_complex", ColumnType.LONG) + .build(); + testIngestionQuery() + .sql("REPLACE INTO foo OVERWRITE ALL\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3,\n" + + " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "GROUP BY 1,2,3,4,5,6\n" + + "PARTITIONED BY ALL TIME" + ) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + GroupByQuery.builder() + .setDataSource(externalDataSource) + .setGranularity(Granularities.ALL) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setVirtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) + ) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), + new DefaultDimensionSpec("b", "d1", ColumnType.STRING), + new DefaultDimensionSpec("c", "d3", ColumnType.LONG), + new DefaultDimensionSpec("d", "d4", ColumnType.LONG), + new DefaultDimensionSpec("e", "d5", ColumnType.STRING) + ) + ) + .setAggregatorSpecs( + new CardinalityAggregatorFactory( + "a0", + null, + ImmutableList.of( + new DefaultDimensionSpec( + "c", + "c", + ColumnType.LONG + ) + ), + false, + true + ) + ) + .setPostAggregatorSpecs( + expressionPostAgg("p0", "1", ColumnType.LONG), + expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + /** * Adding a new column during ingestion that is not defined in a sealed table should fail with * proper validation error. @@ -287,6 +380,100 @@ public void testReplaceWithSourceIntoCatalogTable() .verify(); } + /** + * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testGroupByReplaceWithSourceIntoCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("extra4_complex", ColumnType.LONG) + .build(); + testIngestionQuery() + .sql("REPLACE INTO \"foo\" OVERWRITE ALL\n" + + "WITH \"ext\" AS (\n" + + " SELECT *\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + ")\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3,\n" + + " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + + "FROM \"ext\"\n" + + "GROUP BY 1,2,3,4,5,6\n" + + "PARTITIONED BY ALL TIME" + ) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + GroupByQuery.builder() + .setDataSource(externalDataSource) + .setGranularity(Granularities.ALL) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setVirtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) + ) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), + new DefaultDimensionSpec("b", "d1", ColumnType.STRING), + new DefaultDimensionSpec("c", "d3", ColumnType.LONG), + new DefaultDimensionSpec("d", "d4", ColumnType.LONG), + new DefaultDimensionSpec("e", "d5", ColumnType.STRING) + ) + ) + .setAggregatorSpecs( + new CardinalityAggregatorFactory( + "a0", + null, + ImmutableList.of( + new DefaultDimensionSpec( + "c", + "c", + ColumnType.LONG + ) + ), + false, + true + ) + ) + .setPostAggregatorSpecs( + expressionPostAgg("p0", "1", ColumnType.LONG), + expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + @Test public void testReplaceIntoExistingStrictNoDefinedSchema() { @@ -313,4 +500,20 @@ public void testReplaceIntoExistingWithIncompatibleTypeAssignment() "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") .verify(); } + + @Test + public void testGroupByReplaceIntoExistingWithIncompatibleTypeAssignment() + { + testIngestionQuery() + .sql("REPLACE INTO foo OVERWRITE ALL\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS unique_dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Cannot assign to target field 'unique_dim1' of type COMPLEX from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])") + .verify(); + } } From 3012773a0da7efbb5026b0c0d8c821c3b87cbbe0 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 1 Apr 2024 17:23:54 -0400 Subject: [PATCH 5/9] * simplify tests --- .../druid/catalog/sql/CatalogInsertTest.java | 2 +- .../druid/catalog/sql/CatalogReplaceTest.java | 4 +- .../CalciteCatalogIngestionDmlTest.java | 518 +++++++++++++++++- .../sql/calcite/CalciteCatalogInsertTest.java | 497 +---------------- .../calcite/CalciteCatalogReplaceTest.java | 493 +---------------- 5 files changed, 527 insertions(+), 987 deletions(-) diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java index 2a6aae755e7b..a4e927b4546e 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java @@ -69,7 +69,7 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework) public void buildDatasources() { - resolvedTables.forEach((datasourceName, datasourceTable) -> { + RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> { DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata(); TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString()); catalogMetadata.columnFacades().forEach( diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java index 34011fb2205f..b0d0c707335b 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java @@ -68,7 +68,7 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework) public void buildDatasources() { - resolvedTables.forEach((datasourceName, datasourceTable) -> { + RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> { DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata(); TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString()); catalogMetadata.columnFacades().forEach( @@ -92,7 +92,7 @@ public void buildDatasources() createTableMetadata(tableBuilder.build()); }); DatasourceFacade catalogMetadata = - ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata(); + ((DatasourceTable) RESOLVED_TABLES.get("foo")).effectiveMetadata().catalogMetadata(); TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString()); catalogMetadata.columnFacades().forEach( columnFacade -> { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java index 6c91941ae971..f7b8542e143a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java @@ -29,17 +29,44 @@ import org.apache.druid.catalog.model.TableSpec; import org.apache.druid.catalog.model.facade.DatasourceFacade; import org.apache.druid.catalog.model.table.DatasourceDefn; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.error.DruidException; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.external.ExternalDataSource; +import org.apache.druid.sql.calcite.external.Externals; +import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.junit.jupiter.api.Test; -public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest +public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest { - public ImmutableMap resolvedTables = ImmutableMap.of( + private final String operationName; + private final String dmlPrefixPattern; + + public CalciteCatalogIngestionDmlTest() + { + this.operationName = getOperationName(); + this.dmlPrefixPattern = getDmlPrefixPattern(); + } + + public abstract String getOperationName(); + public abstract String getDmlPrefixPattern(); + + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + public static ImmutableMap RESOLVED_TABLES = ImmutableMap.of( "hourDs", new DatasourceTable( RowSignature.builder().addTimeColumn().build(), new DatasourceTable.PhysicalDatasourceMetadata( @@ -200,8 +227,6 @@ public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest ) ); - private static final ObjectMapper MAPPER = new DefaultObjectMapper(); - @Override public CatalogResolver createCatalogResolver() { @@ -212,11 +237,492 @@ public DruidTable resolveDatasource( final DatasourceTable.PhysicalDatasourceMetadata dsMetadata ) { - if (resolvedTables.get(tableName) != null) { - return resolvedTables.get(tableName); + if (RESOLVED_TABLES.get(tableName) != null) { + return RESOLVED_TABLES.get(tableName); } return dsMetadata == null ? null : new DatasourceTable(dsMetadata); } }; } + + /** + * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the + * value from the catalog. + */ + @Test + public void testInsertHourGrainPartitonedByFromCatalog() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.HOUR)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is given in the catalog, and also by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testInsertHourGrainWithDayPartitonedByFromQuery() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" + + "SELECT * FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then + * validation error. + */ + @Test + public void testInsertNoPartitonedByFromCatalog() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectValidationError( + DruidException.class, + StringUtils.format("Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", operationName) + ) + .verify(); + } + + /** + * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testInsertNoPartitonedByWithDayPartitonedByFromQuery() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" + + "SELECT * FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } + + /** + * Adding a new column during ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .build(); + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + newScanQueryBuilder() + .dataSource(externalDataSource) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), + expressionVirtualColumn("v1", "1", ColumnType.LONG), + expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), + expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("b", "e", "v0", "v1", "v2", "v3") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + /** + * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testGroupByInsertAddNonDefinedColumnIntoNonSealedCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("extra4_complex", ColumnType.LONG) + .build(); + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3,\n" + + " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "GROUP BY 1,2,3,4,5,6\n" + + "PARTITIONED BY ALL TIME" + ) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + GroupByQuery.builder() + .setDataSource(externalDataSource) + .setGranularity(Granularities.ALL) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setVirtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) + ) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), + new DefaultDimensionSpec("b", "d1", ColumnType.STRING), + new DefaultDimensionSpec("c", "d3", ColumnType.LONG), + new DefaultDimensionSpec("d", "d4", ColumnType.LONG), + new DefaultDimensionSpec("e", "d5", ColumnType.STRING) + ) + ) + .setAggregatorSpecs( + new CardinalityAggregatorFactory( + "a0", + null, + ImmutableList.of( + new DefaultDimensionSpec( + "c", + "c", + ColumnType.LONG + ) + ), + false, + true + ) + ) + .setPostAggregatorSpecs( + expressionPostAgg("p0", "1", ColumnType.LONG), + expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + /** + * Adding a new column during ingestion that is not defined in a sealed table should fail with + * proper validation error. + */ + @Test + public void testInsertAddNonDefinedColumnIntoSealedCatalogTable() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "fooSealed") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectValidationError( + DruidException.class, + "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema" + ) + .verify(); + } + + + /** + * Inserting into a catalog table with a WITH source succeeds + */ + @Test + public void testInsertWithSourceIntoCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .build(); + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "WITH \"ext\" AS (\n" + + " SELECT *\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + ")\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3\n" + + "FROM \"ext\"\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + newScanQueryBuilder() + .dataSource(externalDataSource) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), + expressionVirtualColumn("v1", "1", ColumnType.LONG), + expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), + expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("b", "e", "v0", "v1", "v2", "v3") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + /** + * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testGroupByInsertWithSourceIntoCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("extra4_complex", ColumnType.LONG) + .build(); + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "WITH \"ext\" AS (\n" + + " SELECT *\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + ")\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3,\n" + + " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + + "FROM \"ext\"\n" + + "GROUP BY 1,2,3,4,5,6\n" + + "PARTITIONED BY ALL TIME" + ) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + GroupByQuery.builder() + .setDataSource(externalDataSource) + .setGranularity(Granularities.ALL) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setVirtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) + ) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), + new DefaultDimensionSpec("b", "d1", ColumnType.STRING), + new DefaultDimensionSpec("c", "d3", ColumnType.LONG), + new DefaultDimensionSpec("d", "d4", ColumnType.LONG), + new DefaultDimensionSpec("e", "d5", ColumnType.STRING) + ) + ) + .setAggregatorSpecs( + new CardinalityAggregatorFactory( + "a0", + null, + ImmutableList.of( + new DefaultDimensionSpec( + "c", + "c", + ColumnType.LONG + ) + ), + false, + true + ) + ) + .setPostAggregatorSpecs( + expressionPostAgg("p0", "1", ColumnType.LONG), + expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + @Test + public void testInsertIntoExistingStrictNoDefinedSchema() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "strictTableWithNoDefinedSchema") + " SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema") + .verify(); + } + + @Test + public void testInsertIntoExistingWithIncompatibleTypeAssignment() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") + .verify(); + } + + @Test + public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS unique_dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Cannot assign to target field 'unique_dim1' of type COMPLEX from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])") + .verify(); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java index c780a040d042..fff5ca9bc58d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java @@ -19,505 +19,20 @@ package org.apache.druid.sql.calcite; -import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.impl.CsvInputFormat; -import org.apache.druid.data.input.impl.InlineInputSource; -import org.apache.druid.error.DruidException; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; -import org.apache.druid.query.dimension.DefaultDimensionSpec; -import org.apache.druid.query.groupby.GroupByQuery; -import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.external.ExternalDataSource; -import org.apache.druid.sql.calcite.external.Externals; -import org.apache.druid.sql.calcite.filtration.Filtration; -import org.apache.druid.sql.calcite.util.CalciteTests; -import org.junit.jupiter.api.Test; - /** * Test for INSERT DML statements for tables defined in catalog. */ public class CalciteCatalogInsertTest extends CalciteCatalogIngestionDmlTest { - /** - * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the - * value from the catalog. - */ - @Test - public void testInsertHourGrainPartitonedByFromCatalog() - { - testIngestionQuery() - .sql("INSERT INTO hourDs\n" + - "SELECT * FROM foo") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.HOUR)) - .build() - ) - .verify(); - } - - /** - * If the segment grain is given in the catalog, and also by PARTITIONED BY, then - * the query value is used. - */ - @Test - public void testInsertHourGrainWithDayPartitonedByFromQuery() - { - testIngestionQuery() - .sql("INSERT INTO hourDs\n" + - "SELECT * FROM foo\n" + - "PARTITIONED BY day") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.DAY)) - .build() - ) - .verify(); - } - - /** - * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then - * validation error. - */ - @Test - public void testInsertNoPartitonedByFromCatalog() - { - testIngestionQuery() - .sql("INSERT INTO noPartitonedBy\n" + - "SELECT * FROM foo") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectValidationError( - DruidException.class, - "Operation [INSERT] requires a PARTITIONED BY to be explicitly defined, but none was found." - ) - .verify(); - } - - /** - * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then - * the query value is used. - */ - @Test - public void testInsertNoPartitonedByWithDayPartitonedByFromQuery() - { - testIngestionQuery() - .sql("INSERT INTO noPartitonedBy\n" + - "SELECT * FROM foo\n" + - "PARTITIONED BY day") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.DAY)) - .build() - ) - .verify(); - } - - /** - * Adding a new column during ingestion that is not defined in a non-sealed table should succeed. - */ - @Test - public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .build(); - testIngestionQuery() - .sql("INSERT INTO foo\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - newScanQueryBuilder() - .dataSource(externalDataSource) - .intervals(querySegmentSpec(Filtration.eternity())) - .virtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), - expressionVirtualColumn("v1", "1", ColumnType.LONG), - expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), - expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("b", "e", "v0", "v1", "v2", "v3") - .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - /** - * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. - */ - @Test - public void testGroupByInsertAddNonDefinedColumnIntoNonSealedCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .add("extra4_complex", ColumnType.LONG) - .build(); - testIngestionQuery() - .sql("INSERT INTO foo\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3,\n" + - " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "GROUP BY 1,2,3,4,5,6\n" + - "PARTITIONED BY ALL TIME" - ) - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - GroupByQuery.builder() - .setDataSource(externalDataSource) - .setGranularity(Granularities.ALL) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setVirtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) - ) - .setDimensions( - dimensions( - new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), - new DefaultDimensionSpec("b", "d1", ColumnType.STRING), - new DefaultDimensionSpec("c", "d3", ColumnType.LONG), - new DefaultDimensionSpec("d", "d4", ColumnType.LONG), - new DefaultDimensionSpec("e", "d5", ColumnType.STRING) - ) - ) - .setAggregatorSpecs( - new CardinalityAggregatorFactory( - "a0", - null, - ImmutableList.of( - new DefaultDimensionSpec( - "c", - "c", - ColumnType.LONG - ) - ), - false, - true - ) - ) - .setPostAggregatorSpecs( - expressionPostAgg("p0", "1", ColumnType.LONG), - expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - /** - * Adding a new column during ingestion that is not defined in a sealed table should fail with - * proper validation error. - */ - @Test - public void testInsertAddNonDefinedColumnIntoSealedCatalogTable() - { - testIngestionQuery() - .sql("INSERT INTO fooSealed\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectValidationError( - DruidException.class, - "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema" - ) - .verify(); - } - - - /** - * Inserting into a catalog table with a WITH source succeeds - */ - @Test - public void testInsertWithSourceIntoCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .build(); - testIngestionQuery() - .sql("INSERT INTO \"foo\"\n" + - "WITH \"ext\" AS (\n" + - " SELECT *\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - ")\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3\n" + - "FROM \"ext\"\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - newScanQueryBuilder() - .dataSource(externalDataSource) - .intervals(querySegmentSpec(Filtration.eternity())) - .virtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), - expressionVirtualColumn("v1", "1", ColumnType.LONG), - expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), - expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("b", "e", "v0", "v1", "v2", "v3") - .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - /** - * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. - */ - @Test - public void testGroupByInsertWithSourceIntoCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .add("extra4_complex", ColumnType.LONG) - .build(); - testIngestionQuery() - .sql("INSERT INTO foo\n" + - "WITH \"ext\" AS (\n" + - " SELECT *\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - ")\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3,\n" + - " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + - "FROM \"ext\"\n" + - "GROUP BY 1,2,3,4,5,6\n" + - "PARTITIONED BY ALL TIME" - ) - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - GroupByQuery.builder() - .setDataSource(externalDataSource) - .setGranularity(Granularities.ALL) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setVirtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) - ) - .setDimensions( - dimensions( - new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), - new DefaultDimensionSpec("b", "d1", ColumnType.STRING), - new DefaultDimensionSpec("c", "d3", ColumnType.LONG), - new DefaultDimensionSpec("d", "d4", ColumnType.LONG), - new DefaultDimensionSpec("e", "d5", ColumnType.STRING) - ) - ) - .setAggregatorSpecs( - new CardinalityAggregatorFactory( - "a0", - null, - ImmutableList.of( - new DefaultDimensionSpec( - "c", - "c", - ColumnType.LONG - ) - ), - false, - true - ) - ) - .setPostAggregatorSpecs( - expressionPostAgg("p0", "1", ColumnType.LONG), - expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - @Test - public void testInsertIntoExistingStrictNoDefinedSchema() - { - testIngestionQuery() - .sql("INSERT INTO strictTableWithNoDefinedSchema SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema") - .verify(); - } - - @Test - public void testInsertIntoExistingWithIncompatibleTypeAssignment() + @Override + public String getOperationName() { - testIngestionQuery() - .sql("INSERT INTO foo\n" - + "SELECT\n" - + " __time AS __time,\n" - + " ARRAY[dim1] AS dim1\n" - + "FROM foo\n" - + "PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") - .verify(); + return "INSERT"; } - @Test - public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment() + @Override + public String getDmlPrefixPattern() { - testIngestionQuery() - .sql("INSERT INTO foo\n" - + "SELECT\n" - + " __time AS __time,\n" - + " ARRAY[dim1] AS unique_dim1\n" - + "FROM foo\n" - + "PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Cannot assign to target field 'unique_dim1' of type COMPLEX from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])") - .verify(); + return "INSERT INTO \"%s\""; } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java index 0ba5d345d8ff..aad22693c93d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java @@ -19,501 +19,20 @@ package org.apache.druid.sql.calcite; -import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.impl.CsvInputFormat; -import org.apache.druid.data.input.impl.InlineInputSource; -import org.apache.druid.error.DruidException; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; -import org.apache.druid.query.dimension.DefaultDimensionSpec; -import org.apache.druid.query.groupby.GroupByQuery; -import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.external.ExternalDataSource; -import org.apache.druid.sql.calcite.external.Externals; -import org.apache.druid.sql.calcite.filtration.Filtration; -import org.apache.druid.sql.calcite.util.CalciteTests; -import org.junit.jupiter.api.Test; - /** * Test for REPLACE DML statements for tables defined in catalog. */ public class CalciteCatalogReplaceTest extends CalciteCatalogIngestionDmlTest { - /** - * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the - * value from the catalog. - */ - @Test - public void testReplaceHourGrainPartitonedByFromCatalog() - { - testIngestionQuery() - .sql("REPLACE INTO hourDs OVERWRITE ALL\n" + - "SELECT * FROM foo") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.HOUR)) - .build() - ) - .verify(); - } - - /** - * If the segment grain is given in the catalog, and also by PARTITIONED BY, then - * the query value is used. - */ - @Test - public void testReplaceHourGrainWithDayPartitonedByFromQuery() - { - testIngestionQuery() - .sql("REPLACE INTO hourDs OVERWRITE ALL\n" + - "SELECT *FROM foo\n" + - "PARTITIONED BY day") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.DAY)) - .build() - ) - .verify(); - } - - /** - * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then - * validation error. - */ - @Test - public void testInsertNoPartitonedByFromCatalog() - { - testIngestionQuery() - .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" + - "SELECT * FROM foo") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectValidationError( - DruidException.class, - "Operation [REPLACE] requires a PARTITIONED BY to be explicitly defined, but none was found." - ) - .verify(); - } - - /** - * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then - * the query value is used. - */ - @Test - public void testInsertNoPartitonedByWithDayPartitonedByFromQuery() - { - testIngestionQuery() - .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" + - "SELECT * FROM foo\n" + - "PARTITIONED BY day") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.DAY)) - .build() - ) - .verify(); - } - - /** - * Adding a new column during ingestion that is not defined in a non-sealed table should succeed. - */ - @Test - public void testReplaceAddNonDefinedColumnIntoNonSealedCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .build(); - testIngestionQuery() - .sql("REPLACE INTO foo OVERWRITE ALL\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - newScanQueryBuilder() - .dataSource(externalDataSource) - .intervals(querySegmentSpec(Filtration.eternity())) - .virtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), - expressionVirtualColumn("v1", "1", ColumnType.LONG), - expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), - expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. Here we just check that the - // set of columns is correct, but not their order. - .columns("b", "e", "v0", "v1", "v2", "v3") - .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - /** - * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. - */ - @Test - public void testGroupByReplaceAddNonDefinedColumnIntoNonSealedCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .add("extra4_complex", ColumnType.LONG) - .build(); - testIngestionQuery() - .sql("REPLACE INTO foo OVERWRITE ALL\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3,\n" + - " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "GROUP BY 1,2,3,4,5,6\n" + - "PARTITIONED BY ALL TIME" - ) - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - GroupByQuery.builder() - .setDataSource(externalDataSource) - .setGranularity(Granularities.ALL) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setVirtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) - ) - .setDimensions( - dimensions( - new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), - new DefaultDimensionSpec("b", "d1", ColumnType.STRING), - new DefaultDimensionSpec("c", "d3", ColumnType.LONG), - new DefaultDimensionSpec("d", "d4", ColumnType.LONG), - new DefaultDimensionSpec("e", "d5", ColumnType.STRING) - ) - ) - .setAggregatorSpecs( - new CardinalityAggregatorFactory( - "a0", - null, - ImmutableList.of( - new DefaultDimensionSpec( - "c", - "c", - ColumnType.LONG - ) - ), - false, - true - ) - ) - .setPostAggregatorSpecs( - expressionPostAgg("p0", "1", ColumnType.LONG), - expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - /** - * Adding a new column during ingestion that is not defined in a sealed table should fail with - * proper validation error. - */ - @Test - public void testReplaceAddNonDefinedColumnIntoSealedCatalogTable() - { - testIngestionQuery() - .sql("REPLACE INTO fooSealed OVERWRITE ALL\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectValidationError( - DruidException.class, - "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema" - ) - .verify(); - } - - - /** - * Replacing into a catalog table with a WITH source succeeds - */ - @Test - public void testReplaceWithSourceIntoCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .build(); - testIngestionQuery() - .sql("REPLACE INTO \"foo\" OVERWRITE ALL\n" + - "WITH \"ext\" AS (\n" + - " SELECT *\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - ")\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3\n" + - "FROM \"ext\"\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - newScanQueryBuilder() - .dataSource(externalDataSource) - .intervals(querySegmentSpec(Filtration.eternity())) - .virtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), - expressionVirtualColumn("v1", "1", ColumnType.LONG), - expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), - expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. Here we just check that the - // set of columns is correct, but not their order. - .columns("b", "e", "v0", "v1", "v2", "v3") - .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - /** - * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. - */ - @Test - public void testGroupByReplaceWithSourceIntoCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .add("extra4_complex", ColumnType.LONG) - .build(); - testIngestionQuery() - .sql("REPLACE INTO \"foo\" OVERWRITE ALL\n" + - "WITH \"ext\" AS (\n" + - " SELECT *\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - ")\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3,\n" + - " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + - "FROM \"ext\"\n" + - "GROUP BY 1,2,3,4,5,6\n" + - "PARTITIONED BY ALL TIME" - ) - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - GroupByQuery.builder() - .setDataSource(externalDataSource) - .setGranularity(Granularities.ALL) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setVirtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) - ) - .setDimensions( - dimensions( - new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), - new DefaultDimensionSpec("b", "d1", ColumnType.STRING), - new DefaultDimensionSpec("c", "d3", ColumnType.LONG), - new DefaultDimensionSpec("d", "d4", ColumnType.LONG), - new DefaultDimensionSpec("e", "d5", ColumnType.STRING) - ) - ) - .setAggregatorSpecs( - new CardinalityAggregatorFactory( - "a0", - null, - ImmutableList.of( - new DefaultDimensionSpec( - "c", - "c", - ColumnType.LONG - ) - ), - false, - true - ) - ) - .setPostAggregatorSpecs( - expressionPostAgg("p0", "1", ColumnType.LONG), - expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - @Test - public void testReplaceIntoExistingStrictNoDefinedSchema() - { - testIngestionQuery() - .sql("REPLACE INTO strictTableWithNoDefinedSchema OVERWRITE ALL SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema") - .verify(); - } - - @Test - public void testReplaceIntoExistingWithIncompatibleTypeAssignment() + @Override + public String getOperationName() { - testIngestionQuery() - .sql("REPLACE INTO foo OVERWRITE ALL\n" - + "SELECT\n" - + " __time AS __time,\n" - + " ARRAY[dim1] AS dim1\n" - + "FROM foo\n" - + "PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") - .verify(); + return "REPLACE"; } - @Test - public void testGroupByReplaceIntoExistingWithIncompatibleTypeAssignment() + @Override + public String getDmlPrefixPattern() { - testIngestionQuery() - .sql("REPLACE INTO foo OVERWRITE ALL\n" - + "SELECT\n" - + " __time AS __time,\n" - + " ARRAY[dim1] AS unique_dim1\n" - + "FROM foo\n" - + "PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Cannot assign to target field 'unique_dim1' of type COMPLEX from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])") - .verify(); + return "REPLACE INTO \"%s\" OVERWRITE ALL"; } } From 853ea765cd2aea9fc4ae7704fa8c86e8fc718144 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 2 Apr 2024 00:30:00 -0400 Subject: [PATCH 6/9] * fix complex type nullability issue --- .../sql/calcite/planner/DruidSqlValidator.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index 2531af065cad..dcb4adbd65cb 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -60,6 +60,7 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.Types; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; @@ -478,11 +479,17 @@ private RelDataType validateTargetType( if (sqlTypeName != null) { relType = typeFactory.createSqlType(sqlTypeName); } else { - relType = RowSignatures.columnTypeToRelDataType( - typeFactory, - ColumnType.fromString(definedCol.sqlStorageType()), - sourceField.getType().isNullable() - ); + ColumnType columnType = ColumnType.fromString(definedCol.sqlStorageType()); + if (columnType != null && columnType.getType().equals(ValueType.COMPLEX)) { + relType = RowSignatures.makeComplexType(typeFactory, columnType, sourceField.getType().isNullable()); + } else { + relType = RowSignatures.columnTypeToRelDataType( + typeFactory, + columnType, + // this nullability is ignored for complex types for some reason, hence the check for complex above. + sourceField.getType().isNullable() + ); + } } fields.add(Pair.of( From fdf2140110ba08304c7ad88d41a4e6aa7f68b061 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 16 Apr 2024 11:25:30 -0400 Subject: [PATCH 7/9] * address review comments --- .../calcite/planner/DruidSqlValidator.java | 58 ++++++++++--------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index 78a25d523b0d..03dbbd4b7a7d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -475,24 +475,7 @@ private RelDataType validateTargetType( fields.add(Pair.of(colName, sourceField.getType())); continue; } - SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType()); - RelDataType relType; - if (sqlTypeName != null) { - relType = typeFactory.createSqlType(sqlTypeName); - } else { - ColumnType columnType = ColumnType.fromString(definedCol.sqlStorageType()); - if (columnType != null && columnType.getType().equals(ValueType.COMPLEX)) { - relType = RowSignatures.makeComplexType(typeFactory, columnType, sourceField.getType().isNullable()); - } else { - relType = RowSignatures.columnTypeToRelDataType( - typeFactory, - columnType, - // this nullability is ignored for complex types for some reason, hence the check for complex above. - sourceField.getType().isNullable() - ); - } - } - + RelDataType relType = computeTypeForDefinedCol(definedCol, sourceField); fields.add(Pair.of( colName, typeFactory.createTypeWithNullability(relType, sourceField.getType().isNullable()) @@ -526,18 +509,14 @@ protected void checkTypeAssignment( RelDataType targetFieldRelDataType = targetFields.get(i).getType(); ColumnType sourceFieldColumnType = Calcites.getColumnTypeForRelDataType(sourceFielRelDataType); ColumnType targetFieldColumnType = Calcites.getColumnTypeForRelDataType(targetFieldRelDataType); - - boolean incompatible; try { - incompatible = !Objects.equals( + if (!Objects.equals( targetFieldColumnType, - ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType) - ); + ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType))) { + throw new Types.IncompatibleTypeException(targetFieldColumnType, sourceFieldColumnType); + } } catch (Types.IncompatibleTypeException e) { - incompatible = true; - } - if (incompatible) { SqlNode node = getNthExpr(query, i, sourceCount); String targetTypeString; String sourceTypeString; @@ -554,12 +533,39 @@ protected void checkTypeAssignment( Static.RESOURCE.typeNotAssignable( targetFields.get(i).getName(), targetTypeString, sourceFields.get(i).getName(), sourceTypeString)); + } } // the call to base class definition will insert implicit casts / coercions where needed. super.checkTypeAssignment(sourceScope, table, sourceRowType, targetRowType, query); } + protected RelDataType computeTypeForDefinedCol( + final DatasourceFacade.ColumnFacade definedCol, + final RelDataTypeField sourceField + ) + { + SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType()); + RelDataType relType; + if (sqlTypeName != null) { + relType = typeFactory.createSqlType(sqlTypeName); + } else { + ColumnType columnType = ColumnType.fromString(definedCol.sqlStorageType()); + if (columnType != null && columnType.getType().equals(ValueType.COMPLEX)) { + relType = RowSignatures.makeComplexType(typeFactory, columnType, sourceField.getType().isNullable()); + } else { + relType = RowSignatures.columnTypeToRelDataType( + typeFactory, + columnType, + // this nullability is ignored for complex types for some reason, hence the check for complex above. + sourceField.getType().isNullable() + ); + } + } + + return relType; + } + /** * Locates the n'th expression in an INSERT or UPDATE query. * From 736a7c8ce5aaa3d1fc58cbc3eb795cf390db49ac Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 16 Apr 2024 11:28:30 -0400 Subject: [PATCH 8/9] * address test review comments --- .../java/org/apache/druid/catalog/sql/CatalogInsertTest.java | 2 +- .../java/org/apache/druid/catalog/sql/CatalogReplaceTest.java | 2 +- .../druid/sql/calcite/CalciteCatalogIngestionDmlTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java index a4e927b4546e..5b83a7d6cf7d 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java @@ -70,7 +70,7 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework) public void buildDatasources() { RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> { - DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata(); + DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata(); TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString()); catalogMetadata.columnFacades().forEach( columnFacade -> { diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java index b0d0c707335b..d9dd9dda1840 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java @@ -69,7 +69,7 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework) public void buildDatasources() { RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> { - DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata(); + DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata(); TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString()); catalogMetadata.columnFacades().forEach( columnFacade -> { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java index f7b8542e143a..4715096e6bb6 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java @@ -66,7 +66,7 @@ public CalciteCatalogIngestionDmlTest() public abstract String getDmlPrefixPattern(); private static final ObjectMapper MAPPER = new DefaultObjectMapper(); - public static ImmutableMap RESOLVED_TABLES = ImmutableMap.of( + public static ImmutableMap RESOLVED_TABLES = ImmutableMap.of( "hourDs", new DatasourceTable( RowSignature.builder().addTimeColumn().build(), new DatasourceTable.PhysicalDatasourceMetadata( From 7ad8289b2a2f3b9bcfc6152992e0738757fef4a6 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 16 Apr 2024 13:57:04 -0400 Subject: [PATCH 9/9] * fix checkstyle --- .../java/org/apache/druid/catalog/sql/CatalogInsertTest.java | 1 - .../java/org/apache/druid/catalog/sql/CatalogReplaceTest.java | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java index 5b83a7d6cf7d..49ebf3f11f72 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java @@ -31,7 +31,6 @@ import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5; import org.apache.druid.sql.calcite.CalciteCatalogInsertTest; import org.apache.druid.sql.calcite.planner.CatalogResolver; -import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.util.SqlTestFramework; import org.junit.jupiter.api.extension.RegisterExtension; diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java index d9dd9dda1840..31e0a34112cf 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java @@ -31,7 +31,6 @@ import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5; import org.apache.druid.sql.calcite.CalciteCatalogReplaceTest; import org.apache.druid.sql.calcite.planner.CatalogResolver; -import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.util.SqlTestFramework; import org.junit.jupiter.api.extension.RegisterExtension; @@ -92,7 +91,7 @@ public void buildDatasources() createTableMetadata(tableBuilder.build()); }); DatasourceFacade catalogMetadata = - ((DatasourceTable) RESOLVED_TABLES.get("foo")).effectiveMetadata().catalogMetadata(); + RESOLVED_TABLES.get("foo").effectiveMetadata().catalogMetadata(); TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString()); catalogMetadata.columnFacades().forEach( columnFacade -> {