diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index e6ee3f79ee32..451b13d3f5fc 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.delta.DeltaLogFileIndex import org.apache.spark.sql.delta.rules.CHOptimizeMetadataOnlyDeltaQuery import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.util.SparkPlanRules @@ -125,6 +126,7 @@ object CHRuleApi { c => intercept( SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session))) + injector.injectPost(c => GlutenNoopWriterRule.apply(c.session)) // Gluten columnar: Final rules. injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session)) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 72d769c999e8..d5aa3858428e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -36,6 +36,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec +import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.joins.BaseJoinExec @@ -103,6 +104,7 @@ object VeloxRuleApi { .getExtendedColumnarPostRules() .foreach(each => injector.injectPost(c => each(c.session))) injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf)) + injector.injectPost(c => GlutenNoopWriterRule(c.session)) // Gluten columnar: Final rules. injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session)) @@ -175,6 +177,7 @@ object VeloxRuleApi { .getExtendedColumnarPostRules() .foreach(each => injector.injectPostTransform(c => each(c.session))) injector.injectPostTransform(c => ColumnarCollapseTransformStages(c.glutenConf)) + injector.injectPostTransform(c => GlutenNoopWriterRule(c.session)) injector.injectPostTransform(c => RemoveGlutenTableCacheColumnarToRow(c.session)) injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, c.session)) injector.injectPostTransform(_ => RemoveFallbackTagRule()) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 126417bf18a5..54b5a3463991 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, DataWritingCommandExec} -import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec} import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.vectorized.ColumnarBatch @@ -133,19 +132,33 @@ object GlutenWriterColumnarRules { } } - case class NativeWritePostRule(session: SparkSession) extends Rule[SparkPlan] { + private[datasources] def injectFakeRowAdaptor(command: SparkPlan, child: SparkPlan): SparkPlan = { + child match { + // if the child is columnar, we can just wrap & transfer the columnar data + case c2r: ColumnarToRowExecBase => + command.withNewChildren(Array(FakeRowAdaptor(c2r.child))) + // If the child is aqe, we make aqe "support columnar", + // then aqe itself will guarantee to generate columnar outputs. + // So FakeRowAdaptor will always consumes columnar data, + // thus avoiding the case of c2r->aqe->r2c->writer + case aqe: AdaptiveSparkPlanExec => + command.withNewChildren( + Array( + FakeRowAdaptor( + AdaptiveSparkPlanExec( + aqe.inputPlan, + aqe.context, + aqe.preprocessingRules, + aqe.isSubquery, + supportsColumnar = true + )))) + case other => command.withNewChildren(Array(FakeRowAdaptor(other))) + } + } - private val NOOP_WRITE = "org.apache.spark.sql.execution.datasources.noop.NoopWrite$" + case class NativeWritePostRule(session: SparkSession) extends Rule[SparkPlan] { override def apply(p: SparkPlan): SparkPlan = p match { - case rc @ AppendDataExec(_, _, write) - if write.getClass.getName == NOOP_WRITE && - BackendsApiManager.getSettings.enableNativeWriteFiles() => - injectFakeRowAdaptor(rc, rc.child) - case rc @ OverwriteByExpressionExec(_, _, write) - if write.getClass.getName == NOOP_WRITE && - BackendsApiManager.getSettings.enableNativeWriteFiles() => - injectFakeRowAdaptor(rc, rc.child) case rc @ DataWritingCommandExec(cmd, child) => // The same thread can set these properties in the last query submission. val fields = child.output.toStructType.fields @@ -165,30 +178,6 @@ object GlutenWriterColumnarRules { case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply)) } - - private def injectFakeRowAdaptor(command: SparkPlan, child: SparkPlan): SparkPlan = { - child match { - // if the child is columnar, we can just wrap&transfer the columnar data - case c2r: ColumnarToRowExecBase => - command.withNewChildren(Array(FakeRowAdaptor(c2r.child))) - // If the child is aqe, we make aqe "support columnar", - // then aqe itself will guarantee to generate columnar outputs. - // So FakeRowAdaptor will always consumes columnar data, - // thus avoiding the case of c2r->aqe->r2c->writer - case aqe: AdaptiveSparkPlanExec => - command.withNewChildren( - Array( - FakeRowAdaptor( - AdaptiveSparkPlanExec( - aqe.inputPlan, - aqe.context, - aqe.preprocessingRules, - aqe.isSubquery, - supportsColumnar = true - )))) - case other => command.withNewChildren(Array(FakeRowAdaptor(other))) - } - } } def injectSparkLocalProperty(spark: SparkSession, format: Option[String]): Unit = { diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala new file mode 100644 index 000000000000..bedf006510d3 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.noop + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.injectFakeRowAdaptor +import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec} + +/** + * A rule that injects a FakeRowAdaptor for NoopWrite. + * + * The current V2 Command does not support columnar. Therefore, when its child node is a + * ColumnarNode, Vanilla Spark inserts a ColumnarToRow conversion between V2 Command and its child. + * This rule replaces the inserted ColumnarToRow with a FakeRowAdaptor, effectively bypassing the + * ColumnarToRow operation for NoopWrite. Since NoopWrite does not actually perform any data + * operations, it can accept input data in either row-based or columnar format. + */ +case class GlutenNoopWriterRule(session: SparkSession) extends Rule[SparkPlan] { + override def apply(p: SparkPlan): SparkPlan = p match { + case rc @ AppendDataExec(_, _, NoopWrite) => + injectFakeRowAdaptor(rc, rc.child) + case rc @ OverwriteByExpressionExec(_, _, NoopWrite) => + injectFakeRowAdaptor(rc, rc.child) + case _ => p + } +} diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index f43a3977a3ce..45ed47cedce7 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1122,6 +1122,9 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude("SPARK-32932: Do not use local shuffle read at final stage on write command") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") .exclude("SPARK-32717: AQEOptimizer should respect excludedRules configuration") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 2c6b882850c4..0e56debcd5ed 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -164,14 +164,12 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", "SPARK-34533", "SPARK-34781", "SPARK-35585", - "SPARK-32932", "SPARK-33494", // "SPARK-33933", "SPARK-31220", @@ -1059,7 +1057,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenSupportsCatalogOptionsSuite] enableSuite[GlutenTableCapabilityCheckSuite] enableSuite[GlutenWriteDistributionAndOrderingSuite] - enableSuite[GlutenWriterColumnarRulesSuite] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] // Exclude the following suite for plan changed from SMJ to SHJ. .exclude("avoid shuffle when join 2 bucketed tables") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 6d3c3e865d58..928dc38985ce 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.log4j.Level @@ -42,7 +49,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1196,6 +1203,86 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index f8b6092a46f7..ce9513c8cc9b 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.log4j.Level @@ -41,7 +48,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1175,6 +1182,86 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala deleted file mode 100644 index 10abca1c6dd3..000000000000 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.QueryExecutionListener - -class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait { - - class WriterColumnarListener extends QueryExecutionListener { - var fakeRowAdaptor: Option[FakeRowAdaptor] = None - - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } - } - - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - } - - testGluten("writing to noop") { - withTempDir { - dir => - withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") { - spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) - val listener = new WriterColumnarListener - spark.listenerManager.register(listener) - try { - spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() - spark.sparkContext.listenerBus.waitUntilEmpty() - assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") - } finally { - spark.listenerManager.unregister(listener) - } - } - } - } -} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 126749f78c82..39794bd29079 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1118,6 +1118,9 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude("SPARK-32932: Do not use local shuffle read at final stage on write command") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index f83b91ede1cc..c12aae92fd31 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -70,7 +70,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenSupportsCatalogOptionsSuite] enableSuite[GlutenTableCapabilityCheckSuite] enableSuite[GlutenWriteDistributionAndOrderingSuite] - enableSuite[GlutenWriterColumnarRulesSuite] enableSuite[GlutenQueryCompilationErrorsDSv2Suite] @@ -189,14 +188,12 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", "SPARK-34533", "SPARK-34781", "SPARK-35585", - "SPARK-32932", "SPARK-33494", "SPARK-33933", "SPARK-31220", diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 441f3a60a3a9..779d264114cb 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformerBase} import org.apache.spark.SparkConf @@ -24,14 +25,20 @@ import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -40,7 +47,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1193,6 +1200,86 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 729a12f56cc5..f9f0723e00cc 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +48,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1179,6 +1186,86 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala deleted file mode 100644 index 10abca1c6dd3..000000000000 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.QueryExecutionListener - -class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait { - - class WriterColumnarListener extends QueryExecutionListener { - var fakeRowAdaptor: Option[FakeRowAdaptor] = None - - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } - } - - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - } - - testGluten("writing to noop") { - withTempDir { - dir => - withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") { - spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) - val listener = new WriterColumnarListener - spark.listenerManager.register(listener) - try { - spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() - spark.sparkContext.listenerBus.waitUntilEmpty() - assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") - } finally { - spark.listenerManager.unregister(listener) - } - } - } - } -} diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 829fae1cf590..f3b1786dfb6b 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -982,6 +982,8 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 498c2a3b4e57..c8fe551c444e 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalog import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite} import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite} import org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite -import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, GlutenFileFormatWriterSuite, GlutenFileIndexSuite, GlutenFileMetadataStructRowIndexSuite, GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, GlutenOrcV1AggregatePushDownSuite, GlutenOrcV2AggregatePushDownSuite, GlutenParquetCodecSuite, GlutenParquetReadSchemaSuite, GlutenParquetV1AggregatePushDownSuite, GlutenParquetV2AggregatePushDownSuite, GlutenPathFilterStrategySuite, GlutenPathFilterSuite, GlutenPruneFileSourcePartitionsSuite, GlutenV1WriteCommandSuite, GlutenVectorizedOrcReadSchemaSuite, GlutenVectorizedParquetReadSchemaSuite} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite import org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParserSuite, GlutenCSVv1Suite, GlutenCSVv2Suite} import org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite @@ -178,7 +178,6 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 49d47fa65b1f..2bd5a96dadee 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +46,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1182,6 +1187,37 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 729a12f56cc5..6a3d6da27cef 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +46,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1179,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 59e69858017d..b9cbe62091cd 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -982,6 +982,8 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 904439e83e05..6f1813f13336 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -181,7 +181,6 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 2e5df7b859e3..bd941586d73c 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +46,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1197,6 +1202,37 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 729a12f56cc5..6a3d6da27cef 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +46,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1179,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala new file mode 100644 index 000000000000..ebf17444e623 --- /dev/null +++ b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.datasources + +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.utils.{BackendTestUtils, SystemParameters} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{GlutenQueryTest, SaveMode} +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.util.QueryExecutionListener + +class GlutenNoopWriterRuleSuite extends GlutenQueryTest with SharedSparkSession { + + override def sparkConf: SparkConf = { + val conf = super.sparkConf + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.default.parallelism", "1") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.ui.enabled", "false") + .set("spark.gluten.ui.enabled", "false") + if (BackendTestUtils.isCHBackendLoaded()) { + conf.set(GlutenConfig.GLUTEN_LIB_PATH, SystemParameters.getClickHouseLibPath) + } + conf + } + + class WriterColumnarListener extends QueryExecutionListener { + var fakeRowAdaptor: Option[FakeRowAdaptor] = None + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + + test("writing to noop") { + withTempDir { + dir => + spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) + val listener = new WriterColumnarListener + spark.listenerManager.register(listener) + try { + spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() + spark.sparkContext.listenerBus.waitUntilEmpty() + assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") + } finally { + spark.listenerManager.unregister(listener) + } + } + } +}