diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index b18db93f6291a..bca793ba8e449 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5303,6 +5303,11 @@ "Parameter markers are not allowed in ." ] }, + "PARTITION_BY_VARIANT" : { + "message" : [ + "Cannot use VARIANT producing expressions to partition a DataFrame, but the type of expression is ." + ] + }, "PARTITION_WITH_NESTED_COLUMN_IS_UNSUPPORTED" : { "message" : [ "Invalid partitioning: is missing or is in a map or array." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index d4b97ff037f36..d9a8b0a7a7fed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -96,6 +96,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case _ => None } + protected def variantExprInPartitionExpression(plan: LogicalPlan): Option[Expression] = + plan match { + case r: RepartitionByExpression => + r.partitionExpressions.find(e => hasVariantType(e.dataType)) + case _ => None + } + private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit = { limitExpr match { case e if !e.foldable => limitExpr.failAnalysis( @@ -849,6 +856,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB "colName" -> toSQLId(variantCol.name), "dataType" -> toSQLType(variantCol.dataType))) + case o if variantExprInPartitionExpression(o).isDefined => + val variantExpr = variantExprInPartitionExpression(o).get + o.failAnalysis( + errorClass = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT", + messageParameters = Map( + "expr" -> variantExpr.sql, + "dataType" -> toSQLType(variantExpr.dataType))) + case o if o.expressions.exists(!_.deterministic) && !operatorAllowsNonDeterministicExpressions(o) && !o.isInstanceOf[Project] && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c1d977dad82d2..0972a63a2495c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -308,6 +308,69 @@ class DataFrameSuite extends QueryTest testData.select("key").collect().toSeq) } + test("SPARK-50503 - cannot partition by variant columns") { + val df = sql("select parse_json(case when id = 0 then 'null' else '1' end)" + + " as v, id % 5 as id, named_struct('v', parse_json(id::string)) s from range(0, 100, 1, 5)") + // variant column + checkError( + exception = intercept[AnalysisException](df.repartition(5, col("v"))), + condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT", + parameters = Map( + "expr" -> "v", + "dataType" -> "\"VARIANT\"") + ) + // nested variant column + checkError( + exception = intercept[AnalysisException](df.repartition(5, col("s"))), + condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT", + parameters = Map( + "expr" -> "s", + "dataType" -> "\"STRUCT\"") + ) + // variant producing expression + checkError( + exception = + intercept[AnalysisException](df.repartition(5, parse_json(col("id").cast("string")))), + condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT", + parameters = Map( + "expr" -> "parse_json(CAST(id AS STRING))", + "dataType" -> "\"VARIANT\"") + ) + // Partitioning by non-variant column works + try { + df.repartition(5, col("id")).collect() + } catch { + case e: Exception => + fail(s"Expected no exception to be thrown but an exception was thrown: ${e.getMessage}") + } + // SQL + withTempView("tv") { + df.createOrReplaceTempView("tv") + checkError( + exception = intercept[AnalysisException](sql("SELECT * FROM tv DISTRIBUTE BY v")), + condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT", + parameters = Map( + "expr" -> "tv.v", + "dataType" -> "\"VARIANT\""), + context = ExpectedContext( + fragment = "DISTRIBUTE BY v", + start = 17, + stop = 31) + ) + checkError( + exception = intercept[AnalysisException](sql("SELECT * FROM tv DISTRIBUTE BY s")), + condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT", + parameters = Map( + "expr" -> "tv.s", + "dataType" -> "\"STRUCT\""), + context = ExpectedContext( + fragment = "DISTRIBUTE BY s", + start = 17, + stop = 31) + ) + } + } + test("repartition with SortOrder") { // passing SortOrder expressions to .repartition() should result in an informative error