From 8b07168da09bae132b4720d544f1c41eb8b692dd Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 8 Aug 2023 16:17:02 +0500 Subject: [PATCH 01/15] Error on missing input columns in INSERT --- .../spark/sql/execution/datasources/rules.scala | 4 ++-- .../sql/errors/QueryCompilationErrorsSuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 4cbd54e6d209c..a5246d4419c6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -403,9 +403,9 @@ object PreprocessTableInsertion extends ResolveInsertionBase { insert.query } val newQuery = try { + val byName = hasColumnList || insert.byName TableOutputResolver.resolveOutputColumns( - tblName, expectedColumns, query, byName = hasColumnList || insert.byName, conf, - supportColDefaultValue = true) + tblName, expectedColumns, query, byName, conf, supportColDefaultValue = byName) } catch { case e: AnalysisException if staticPartCols.nonEmpty && (e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" || diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala index 7f938deaaa645..cf0998ef37067 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -913,6 +913,22 @@ class QueryCompilationErrorsSuite ) } } + + test("SPARK-43438: mismatched column list error on INSERT") { + val tbl = "num_cols_insert_tbl" + withTable(tbl) { + sql(s"CREATE TABLE $tbl(c1 INT, c2 INT) USING parquet") + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT INTO $tbl SELECT 1") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`num_cols_insert_tbl`", + "tableColumns" -> "`c1`, `c2`", + "dataColumns" -> "`1`")) + } + } } class MyCastToString extends SparkUserDefinedFunction( From c73695289f2d5b8c88d421267e3d17809866b205 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Aug 2023 11:37:05 +0500 Subject: [PATCH 02/15] Add comments and fix tests --- .../sql/execution/datasources/rules.scala | 13 +++- .../sql/ResolveDefaultColumnsSuite.scala | 59 +++++++++++++++---- .../errors/QueryCompilationErrorsSuite.scala | 16 ----- 3 files changed, 58 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index a5246d4419c6f..26325058346c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -403,9 +403,18 @@ object PreprocessTableInsertion extends ResolveInsertionBase { insert.query } val newQuery = try { - val byName = hasColumnList || insert.byName TableOutputResolver.resolveOutputColumns( - tblName, expectedColumns, query, byName, conf, supportColDefaultValue = byName) + tblName, + expectedColumns, + query, + byName = hasColumnList || insert.byName, + conf, + // According to the SQL standard, 14.11.7 + // INSERT t SELECT ... is the same as INSERT t (c1, c2, ... all columns) SELECT, + // and we require the number of columns to match w/o default values. + // In other cases like in INSERT BY NAME and in INSERT w/ user-specified column list + // INSERT t (c1, c2, ..., ck), we allow default values. + supportColDefaultValue = hasColumnList || insert.byName) } catch { case e: AnalysisException if staticPartCols.nonEmpty && (e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" || diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala index b2cc4e3b746aa..29b2796d25aa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala @@ -35,9 +35,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { // INSERT without user-defined columns sql("truncate table t") - sql("insert into t values (timestamp'2020-12-31')") - checkAnswer(spark.table("t"), - sql("select timestamp'2020-12-31', null").collect().head) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values (timestamp'2020-12-31')") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`", + "dataColumns" -> "`col1`")) } } @@ -57,9 +63,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { // INSERT without user-defined columns sql("truncate table t") - sql("insert into t values (timestamp'2020-12-31')") - checkAnswer(spark.table("t"), - sql("select timestamp'2020-12-31', timestamp'2020-01-01'").collect().head) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values (timestamp'2020-12-31')") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`", + "dataColumns" -> "`col1`")) } } @@ -67,8 +79,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { sql("create table t(c1 int, c2 int, c3 int, c4 int) using parquet partitioned by (c3, c4)") // INSERT without static partitions - sql("insert into t values (1, 2, 3)") - checkAnswer(spark.table("t"), Row(1, 2, 3, null)) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values (1, 2, 3)") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`, `c3`, `c4`", + "dataColumns" -> "`col1`, `col2`, `col3`")) // INSERT without static partitions but with column list sql("truncate table t") @@ -77,8 +96,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { // INSERT with static partitions sql("truncate table t") - sql("insert into t partition(c3=3, c4=4) values (1)") - checkAnswer(spark.table("t"), Row(1, null, 3, 4)) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t partition(c3=3, c4=4) values (1)") + }, + errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`, `c3`, `c4`", + "dataColumns" -> "`col1`", + "staticPartCols" -> "`c3`, `c4`")) // INSERT with static partitions and with column list sql("truncate table t") @@ -87,8 +114,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { // INSERT with partial static partitions sql("truncate table t") - sql("insert into t partition(c3=3, c4) values (1, 2)") - checkAnswer(spark.table("t"), Row(1, 2, 3, null)) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t partition(c3=3, c4) values (1, 2)") + }, + errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`c1`, `c2`, `c3`, `c4`", + "dataColumns" -> "`col1`, `col2`", + "staticPartCols" -> "`c3`")) // INSERT with partial static partitions and with column list is not allowed intercept[AnalysisException](sql("insert into t partition(c3=3, c4) (c1) values (1, 4)")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala index cf0998ef37067..7f938deaaa645 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -913,22 +913,6 @@ class QueryCompilationErrorsSuite ) } } - - test("SPARK-43438: mismatched column list error on INSERT") { - val tbl = "num_cols_insert_tbl" - withTable(tbl) { - sql(s"CREATE TABLE $tbl(c1 INT, c2 INT) USING parquet") - checkError( - exception = intercept[AnalysisException] { - sql(s"INSERT INTO $tbl SELECT 1") - }, - errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", - parameters = Map( - "tableName" -> "`spark_catalog`.`default`.`num_cols_insert_tbl`", - "tableColumns" -> "`c1`, `c2`", - "dataColumns" -> "`1`")) - } - } } class MyCastToString extends SparkUserDefinedFunction( From cd9d3d0bd39d97db1f30055f5db7719edb80fece Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 9 Aug 2023 21:36:15 +0500 Subject: [PATCH 03/15] Fix InsertSuite --- .../spark/sql/sources/InsertSuite.scala | 24 +++++++++++-------- .../apache/spark/sql/hive/InsertSuite.scala | 2 +- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index c6bfd8c14ddf7..680979d93eed1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -962,11 +962,15 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { (1 to 10).map(i => Row(i, null)) ) - sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") - checkAnswer( - sql("SELECT a, b FROM jsonTable"), - (1 to 10).map(i => Row(i, null)) - ) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`unknown`", + "tableColumns" -> "`a`, `b`", + "dataColumns" -> "`a`")) sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt") checkAnswer( @@ -1027,13 +1031,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } withTable("t") { sql("create table t(i int, s bigint default 42, x bigint) using parquet") - sql("insert into t values(1)") + sql("insert into t(i) values(1)") checkAnswer(spark.table("t"), Row(1, 42L, null)) } // The table has a partitioning column and a default value is injected. withTable("t") { sql("create table t(i boolean, s bigint, q int default 42) using parquet partitioned by (i)") - sql("insert into t partition(i='true') values(5, default)") + sql("insert into t partition(i='true') (s, q) values(5, default)") checkAnswer(spark.table("t"), Row(5, 42, true)) } // The table has a partitioning column and a default value is added per an explicit reference. @@ -1495,14 +1499,14 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql(createTableIntCol) sql("alter table t add column s bigint default 42") sql("alter table t add column x bigint") - sql("insert into t values(1)") + sql("insert into t(i) values(1)") checkAnswer(spark.table("t"), Row(1, 42, null)) } // The table has a partitioning column and a default value is injected. withTable("t") { sql("create table t(i boolean, s bigint) using parquet partitioned by (i)") sql("alter table t add column q int default 42") - sql("insert into t partition(i='true') values(5, default)") + sql("insert into t partition(i='true') (s, q) values(5, default)") checkAnswer(spark.table("t"), Row(5, 42, true)) } // The default value parses correctly as a constant but non-literal expression. @@ -1517,7 +1521,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql("create table t(i boolean default false) using parquet") sql("alter table t add column s bigint default 42") - sql("insert into t values(false, default), (default, 42)") + sql("insert into t(i, s) values(false, default), (default, 42)") checkAnswer(spark.table("t"), Seq(Row(false, 42), Row(false, 42))) } // There is an explicit default value provided in the INSERT INTO statement in the VALUES, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 420b4fc83ec91..ea43f1d2c6729 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -391,7 +391,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12") // The data is missing a column. The default value for the missing column is null. - sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13") + sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) (a) SELECT 13") // c is defined twice. Analyzer will complain. intercept[ParseException] { From e565071cb9b2732cd7030644a38dd8efd784ad57 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 10 Aug 2023 11:38:31 +0500 Subject: [PATCH 04/15] Fix HiveQuerySuite --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 4eae3933bf511..a5389607ebbb8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1255,7 +1255,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd checkError( exception = intercept[AnalysisException] { sql( - """INSERT INTO TABLE dp_test PARTITION(dp) + """INSERT INTO TABLE dp_test PARTITION(dp) (key, value, dp) |SELECT key, value, key % 5 FROM src""".stripMargin) }, errorClass = "_LEGACY_ERROR_TEMP_1169", From 049dbd2f5ca84dc8e705d374b841ca75d489635e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 10 Aug 2023 16:02:04 +0500 Subject: [PATCH 05/15] Checks column match in TableOutputResolver.resolveOutputColumns --- .../analysis/TableOutputResolver.scala | 29 +++++-------------- .../sql/execution/datasources/rules.scala | 8 +---- 2 files changed, 8 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 894cd0b39911f..c1aa0cddb56a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -39,9 +39,7 @@ object TableOutputResolver { expected: Seq[Attribute], query: LogicalPlan, byName: Boolean, - conf: SQLConf, - // TODO: Only DS v1 writing will set it to true. We should enable in for DS v2 as well. - supportColDefaultValue: Boolean = false): LogicalPlan = { + conf: SQLConf): LogicalPlan = { val actualExpectedCols = expected.map { attr => attr.withDataType(CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType)) @@ -50,37 +48,24 @@ object TableOutputResolver { if (actualExpectedCols.size < query.output.size) { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( tableName, actualExpectedCols.map(_.name), query) + } else if (actualExpectedCols.size > query.output.size && !byName) { + throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( + tableName, actualExpectedCols.map(_.name), query) } val errors = new mutable.ArrayBuffer[String]() val resolved: Seq[NamedExpression] = if (byName) { // If a top-level column does not have a corresponding value in the input query, fill with - // the column's default value. We need to pass `fillDefaultValue` as true here, if the - // `supportColDefaultValue` parameter is also true. + // the column's default value. reorderColumnsByName( tableName, query.output, actualExpectedCols, conf, errors += _, - fillDefaultValue = supportColDefaultValue) + fillDefaultValue = true) } else { - // If the target table needs more columns than the input query, fill them with - // the columns' default values, if the `supportColDefaultValue` parameter is true. - val fillDefaultValue = supportColDefaultValue && actualExpectedCols.size > query.output.size - val queryOutputCols = if (fillDefaultValue) { - query.output ++ actualExpectedCols.drop(query.output.size).flatMap { expectedCol => - getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) - } - } else { - query.output - } - if (actualExpectedCols.size > queryOutputCols.size) { - throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( - tableName, actualExpectedCols.map(_.name), query) - } - - resolveColumnsByPosition(tableName, queryOutputCols, actualExpectedCols, conf, errors += _) + resolveColumnsByPosition(tableName, query.output, actualExpectedCols, conf, errors += _) } if (errors.nonEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 26325058346c7..a83729bab9b84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -408,13 +408,7 @@ object PreprocessTableInsertion extends ResolveInsertionBase { expectedColumns, query, byName = hasColumnList || insert.byName, - conf, - // According to the SQL standard, 14.11.7 - // INSERT t SELECT ... is the same as INSERT t (c1, c2, ... all columns) SELECT, - // and we require the number of columns to match w/o default values. - // In other cases like in INSERT BY NAME and in INSERT w/ user-specified column list - // INSERT t (c1, c2, ..., ck), we allow default values. - supportColDefaultValue = hasColumnList || insert.byName) + conf) } catch { case e: AnalysisException if staticPartCols.nonEmpty && (e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" || From c9b1fb34e35296b6b37578dcc14f35f6dec1e71c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 23 Aug 2023 13:47:54 +0300 Subject: [PATCH 06/15] Fix DSV2SQLInsertTestSuite.insert by name: mismatch column name --- .../test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index 0bbed51d0a908..4c2a54b3259b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -214,9 +214,9 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { processInsert("t1", df, overwrite = false, byName = true) }, v1ErrorClass = "_LEGACY_ERROR_TEMP_1186", - v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + v2ErrorClass = "_LEGACY_ERROR_TEMP_1186", v1Parameters = Map.empty[String, String], - v2Parameters = Map("tableName" -> "`testcat`.`t1`", "colName" -> "`c1`") + v2Parameters = Map.empty[String, String] ) val df2 = Seq((3, 2, 1, 0)).toDF(Seq("c3", "c2", "c1", "c0"): _*) checkV1AndV2Error( From 722685d448fd4665c624049742bd8f44c791df0c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 23 Aug 2023 14:07:03 +0300 Subject: [PATCH 07/15] Fix DataFrameWriterV2Suite --- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index f58f798b8dec9..db59efc3d4c55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -145,8 +145,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo exception = intercept[AnalysisException] { spark.table("source").withColumnRenamed("data", "d").writeTo("testcat.table_name").append() }, - errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", - parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> "`data`") + errorClass = "_LEGACY_ERROR_TEMP_1186", + parameters = Map.empty ) checkAnswer( @@ -250,8 +250,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo spark.table("source").withColumnRenamed("data", "d") .writeTo("testcat.table_name").overwrite(lit(true)) }, - errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", - parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> "`data`") + errorClass = "_LEGACY_ERROR_TEMP_1186", + parameters = Map.empty ) checkAnswer( @@ -355,8 +355,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo spark.table("source").withColumnRenamed("data", "d") .writeTo("testcat.table_name").overwritePartitions() }, - errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", - parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> "`data`") + errorClass = "_LEGACY_ERROR_TEMP_1186", + parameters = Map.empty ) checkAnswer( From 51d3fbf8e8b52b93211ceb5a37487fc887a9ce29 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 23 Aug 2023 14:11:14 +0300 Subject: [PATCH 08/15] Fix V2AppendDataANSIAnalysisSuite --- .../sql/catalyst/analysis/V2WriteAnalysisSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala index d91a080d8fe89..bad2bddf447fa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala @@ -394,8 +394,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisErrorClass( parsedPlan, - expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", - expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") + expectedErrorClass = "_LEGACY_ERROR_TEMP_1186", + expectedMessageParameters = Map.empty ) } @@ -409,8 +409,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisErrorClass( parsedPlan, - expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", - expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") + expectedErrorClass = "_LEGACY_ERROR_TEMP_1186", + expectedMessageParameters = Map.empty ) } @@ -477,7 +477,7 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { ) } - test("byName: missing optional columns cause failure and are identified by name") { + ignore("byName: missing optional columns cause failure and are identified by name") { // missing optional field x val query = TestRelation(StructType(Seq( StructField("y", FloatType)))) From 01f011792df8b5ad0a17f52756ad0d5c50609665 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 23 Aug 2023 14:14:19 +0300 Subject: [PATCH 09/15] Remove a test --- .../catalyst/analysis/V2WriteAnalysisSuite.scala | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala index bad2bddf447fa..454cee3973b87 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala @@ -477,21 +477,6 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { ) } - ignore("byName: missing optional columns cause failure and are identified by name") { - // missing optional field x - val query = TestRelation(StructType(Seq( - StructField("y", FloatType)))) - - val parsedPlan = byName(table, query) - - assertNotResolved(parsedPlan) - assertAnalysisErrorClass( - parsedPlan, - expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", - expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") - ) - } - test("byName: insert safe cast") { val x = table.output.head val y = table.output.last From 96cab939e91f33913ebed4d1dae3fc3fc19a7275 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 25 Aug 2023 08:54:41 +0300 Subject: [PATCH 10/15] Trigger build From 9c3188cae15eec78f0f7981c152107cdc239e525 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 25 Aug 2023 16:37:39 +0300 Subject: [PATCH 11/15] Trigger build From e00fdbe1a45af61797eeb40077dfbe315ec6c1e8 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 26 Aug 2023 21:34:30 +0300 Subject: [PATCH 12/15] Trigger build From 87f7e9637fd99a39dcb979c7dfaf721650bbf349 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 29 Aug 2023 08:58:01 +0300 Subject: [PATCH 13/15] Move the check --- .../spark/sql/catalyst/analysis/TableOutputResolver.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 108285e26f178..826ca67a2b3e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -86,9 +86,6 @@ object TableOutputResolver { if (actualExpectedCols.size < query.output.size) { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( tableName, actualExpectedCols.map(_.name), query) - } else if (actualExpectedCols.size > query.output.size && !byName) { - throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( - tableName, actualExpectedCols.map(_.name), query) } val errors = new mutable.ArrayBuffer[String]() @@ -103,6 +100,10 @@ object TableOutputResolver { errors += _, fillDefaultValue = true) } else { + if (actualExpectedCols.size > query.output.size) { + throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( + tableName, actualExpectedCols.map(_.name), query) + } resolveColumnsByPosition(tableName, query.output, actualExpectedCols, conf, errors += _) } From 60f9327f38bb0e120a496c330ef21c04756c44e7 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 29 Aug 2023 09:31:02 +0300 Subject: [PATCH 14/15] Address review comments --- .../analysis/TableOutputResolver.scala | 6 +++-- .../analysis/V2WriteAnalysisSuite.scala | 23 +++++++++++++++---- .../sql/execution/datasources/rules.scala | 3 ++- .../spark/sql/DataFrameWriterV2Suite.scala | 12 +++++----- .../apache/spark/sql/SQLInsertTestSuite.scala | 4 ++-- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 826ca67a2b3e7..2f12ee3b9c374 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -77,7 +77,9 @@ object TableOutputResolver { expected: Seq[Attribute], query: LogicalPlan, byName: Boolean, - conf: SQLConf): LogicalPlan = { + conf: SQLConf, + // TODO: Only DS v1 writing will set it to true. We should enable in for DS v2 as well. + supportColDefaultValue: Boolean = false): LogicalPlan = { val actualExpectedCols = expected.map { attr => attr.withDataType(CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType)) @@ -98,7 +100,7 @@ object TableOutputResolver { actualExpectedCols, conf, errors += _, - fillDefaultValue = true) + fillDefaultValue = supportColDefaultValue) } else { if (actualExpectedCols.size > query.output.size) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala index 454cee3973b87..d91a080d8fe89 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala @@ -394,8 +394,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisErrorClass( parsedPlan, - expectedErrorClass = "_LEGACY_ERROR_TEMP_1186", - expectedMessageParameters = Map.empty + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") ) } @@ -409,8 +409,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { assertNotResolved(parsedPlan) assertAnalysisErrorClass( parsedPlan, - expectedErrorClass = "_LEGACY_ERROR_TEMP_1186", - expectedMessageParameters = Map.empty + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") ) } @@ -477,6 +477,21 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { ) } + test("byName: missing optional columns cause failure and are identified by name") { + // missing optional field x + val query = TestRelation(StructType(Seq( + StructField("y", FloatType)))) + + val parsedPlan = byName(table, query) + + assertNotResolved(parsedPlan) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") + ) + } + test("byName: insert safe cast") { val x = table.output.head val y = table.output.last diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index a83729bab9b84..f9b3f73ff0220 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -408,7 +408,8 @@ object PreprocessTableInsertion extends ResolveInsertionBase { expectedColumns, query, byName = hasColumnList || insert.byName, - conf) + conf, + supportColDefaultValue = true) } catch { case e: AnalysisException if staticPartCols.nonEmpty && (e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" || diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index db59efc3d4c55..f58f798b8dec9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -145,8 +145,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo exception = intercept[AnalysisException] { spark.table("source").withColumnRenamed("data", "d").writeTo("testcat.table_name").append() }, - errorClass = "_LEGACY_ERROR_TEMP_1186", - parameters = Map.empty + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> "`data`") ) checkAnswer( @@ -250,8 +250,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo spark.table("source").withColumnRenamed("data", "d") .writeTo("testcat.table_name").overwrite(lit(true)) }, - errorClass = "_LEGACY_ERROR_TEMP_1186", - parameters = Map.empty + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> "`data`") ) checkAnswer( @@ -355,8 +355,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo spark.table("source").withColumnRenamed("data", "d") .writeTo("testcat.table_name").overwritePartitions() }, - errorClass = "_LEGACY_ERROR_TEMP_1186", - parameters = Map.empty + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> "`data`") ) checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index 4c2a54b3259b1..0bbed51d0a908 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -214,9 +214,9 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { processInsert("t1", df, overwrite = false, byName = true) }, v1ErrorClass = "_LEGACY_ERROR_TEMP_1186", - v2ErrorClass = "_LEGACY_ERROR_TEMP_1186", + v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", v1Parameters = Map.empty[String, String], - v2Parameters = Map.empty[String, String] + v2Parameters = Map("tableName" -> "`testcat`.`t1`", "colName" -> "`c1`") ) val df2 = Seq((3, 2, 1, 0)).toDF(Seq("c3", "c2", "c1", "c0"): _*) checkV1AndV2Error( From b644f8f7ae791e8f5cbe5eb29df2d2912bced936 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 29 Aug 2023 14:29:40 +0300 Subject: [PATCH 15/15] Address review comments --- .../spark/sql/catalyst/analysis/TableOutputResolver.scala | 3 ++- .../scala/org/apache/spark/sql/sources/InsertSuite.scala | 6 +++--- .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 8 ++++---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 2f12ee3b9c374..21575f7b96bed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -93,7 +93,8 @@ object TableOutputResolver { val errors = new mutable.ArrayBuffer[String]() val resolved: Seq[NamedExpression] = if (byName) { // If a top-level column does not have a corresponding value in the input query, fill with - // the column's default value. + // the column's default value. We need to pass `fillDefaultValue` as true here, if the + // `supportColDefaultValue` parameter is also true. reorderColumnsByName( tableName, query.output, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index fa0e8daa1800e..cf1f4d4d4f28f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1037,7 +1037,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { // The table has a partitioning column and a default value is injected. withTable("t") { sql("create table t(i boolean, s bigint, q int default 42) using parquet partitioned by (i)") - sql("insert into t partition(i='true') (s, q) values(5, default)") + sql("insert into t partition(i='true') values(5, default)") checkAnswer(spark.table("t"), Row(5, 42, true)) } // The table has a partitioning column and a default value is added per an explicit reference. @@ -1506,7 +1506,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql("create table t(i boolean, s bigint) using parquet partitioned by (i)") sql("alter table t add column q int default 42") - sql("insert into t partition(i='true') (s, q) values(5, default)") + sql("insert into t partition(i='true') values(5, default)") checkAnswer(spark.table("t"), Row(5, 42, true)) } // The default value parses correctly as a constant but non-literal expression. @@ -1521,7 +1521,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql("create table t(i boolean default false) using parquet") sql("alter table t add column s bigint default 42") - sql("insert into t(i, s) values(false, default), (default, 42)") + sql("insert into t values(false, default), (default, 42)") checkAnswer(spark.table("t"), Seq(Row(false, 42), Row(false, 42))) } // There is an explicit default value provided in the INSERT INTO statement in the VALUES, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index a5389607ebbb8..82b88ec9f35d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1255,14 +1255,14 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd checkError( exception = intercept[AnalysisException] { sql( - """INSERT INTO TABLE dp_test PARTITION(dp) (key, value, dp) + """INSERT INTO TABLE dp_test PARTITION(dp) |SELECT key, value, key % 5 FROM src""".stripMargin) }, - errorClass = "_LEGACY_ERROR_TEMP_1169", + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`dp_test`", - "normalizedPartSpec" -> "dp", - "partColNames" -> "dp,sp")) + "tableColumns" -> "`key`, `value`, `dp`, `sp`", + "dataColumns" -> "`key`, `value`, `(key % 5)`")) sql("SET hive.exec.dynamic.partition.mode=nonstrict")