diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 38b1656ac05c1..a491680ea629b 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5355,6 +5355,11 @@ "Cannot have MAP type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column is ." ] }, + "SET_OPERATION_ON_VARIANT_TYPE" : { + "message" : [ + "Cannot have VARIANT type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column is ." + ] + }, "SET_PROPERTIES_AND_DBPROPERTIES" : { "message" : [ "set PROPERTIES and DBPROPERTIES at the same time." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 586a0312e1507..71c150d69055d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -76,6 +76,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB dt.existsRecursively(_.isInstanceOf[MapType]) } + protected def hasVariantType(dt: DataType): Boolean = { + dt.existsRecursively(_.isInstanceOf[VariantType]) + } + protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { case _: Intersect | _: Except | _: Distinct => plan.output.find(a => hasMapType(a.dataType)) @@ -84,6 +88,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case _ => None } + protected def variantColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { + case _: Intersect | _: Except | _: Distinct => + plan.output.find(a => hasVariantType(a.dataType)) + case d: Deduplicate => + d.keys.find(a => hasVariantType(a.dataType)) + case _ => None + } + private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit = { limitExpr match { case e if !e.foldable => limitExpr.failAnalysis( @@ -820,6 +832,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB "colName" -> toSQLId(mapCol.name), "dataType" -> toSQLType(mapCol.dataType))) + // TODO: Remove this type check once we support Variant ordering + case o if variantColumnInSetOperation(o).isDefined => + val variantCol = variantColumnInSetOperation(o).get + o.failAnalysis( + errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE", + messageParameters = Map( + "colName" -> toSQLId(variantCol.name), + "dataType" -> toSQLType(variantCol.dataType))) + case o if o.expressions.exists(!_.deterministic) && !operatorAllowsNonDeterministicExpressions(o) && !o.isInstanceOf[Project] && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index 5ff737d2b57cb..9c182be0f7dd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -350,6 +350,84 @@ class DataFrameSetOperationsSuite extends QueryTest dates.intersect(widenTypedRows).collect() } + test("SPARK-50373 - cannot run set operations with variant type") { + val df = sql("select parse_json(case when id = 0 then 'null' else '1' end)" + + " as v, id % 5 as id from range(0, 100, 1, 5)") + checkError( + exception = intercept[AnalysisException](df.intersect(df)), + condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE", + parameters = Map( + "colName" -> "`v`", + "dataType" -> "\"VARIANT\"") + ) + checkError( + exception = intercept[AnalysisException](df.except(df)), + condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE", + parameters = Map( + "colName" -> "`v`", + "dataType" -> "\"VARIANT\"") + ) + checkError( + exception = intercept[AnalysisException](df.distinct()), + condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE", + parameters = Map( + "colName" -> "`v`", + "dataType" -> "\"VARIANT\"")) + checkError( + exception = intercept[AnalysisException](df.dropDuplicates()), + condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE", + parameters = Map( + "colName" -> "`v`", + "dataType" -> "\"VARIANT\"")) + withTempView("tv") { + df.createOrReplaceTempView("tv") + checkError( + exception = intercept[AnalysisException](sql("SELECT DISTINCT v FROM tv")), + condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE", + parameters = Map( + "colName" -> "`v`", + "dataType" -> "\"VARIANT\""), + context = ExpectedContext( + fragment = "SELECT DISTINCT v FROM tv", + start = 0, + stop = 24) + ) + checkError( + exception = intercept[AnalysisException](sql("SELECT DISTINCT STRUCT(v) FROM tv")), + condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE", + parameters = Map( + "colName" -> "`struct(v)`", + "dataType" -> "\"STRUCT\""), + context = ExpectedContext( + fragment = "SELECT DISTINCT STRUCT(v) FROM tv", + start = 0, + stop = 32) + ) + checkError( + exception = intercept[AnalysisException](sql("SELECT DISTINCT ARRAY(v) FROM tv")), + condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE", + parameters = Map( + "colName" -> "`array(v)`", + "dataType" -> "\"ARRAY\""), + context = ExpectedContext( + fragment = "SELECT DISTINCT ARRAY(v) FROM tv", + start = 0, + stop = 31) + ) + checkError( + exception = intercept[AnalysisException](sql("SELECT DISTINCT MAP('m', v) FROM tv")), + condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", + parameters = Map( + "colName" -> "`map(m, v)`", + "dataType" -> "\"MAP\""), + context = ExpectedContext( + fragment = "SELECT DISTINCT MAP('m', v) FROM tv", + start = 0, + stop = 34) + ) + } + } + test("SPARK-19893: cannot run set operations with map type") { val df = spark.range(1).select(map(lit("key"), $"id").as("m")) checkError(