From 13119c4a2851ecfc19efac7b4c1f5708cfb8e330 Mon Sep 17 00:00:00 2001 From: jackylee-ch Date: Sat, 4 Jan 2025 11:18:36 +0800 Subject: [PATCH 1/3] [CORE] Refact columnar noop write rule --- .../backendsapi/clickhouse/CHRuleApi.scala | 2 + .../backendsapi/velox/VeloxRuleApi.scala | 3 + .../GlutenWriterColumnarRules.scala | 59 ++++++++----------- .../noop/GlutenNoopWriterRule.scala | 35 +++++++++++ .../utils/velox/VeloxTestSettings.scala | 2 +- .../ClickHouseAdaptiveQueryExecSuite.scala | 4 +- .../velox/VeloxAdaptiveQueryExecSuite.scala | 4 +- ....scala => GlutenNoopWriterRuleSuite.scala} | 24 ++++---- .../utils/velox/VeloxTestSettings.scala | 2 +- .../ClickHouseAdaptiveQueryExecSuite.scala | 4 +- .../velox/VeloxAdaptiveQueryExecSuite.scala | 4 +- ....scala => GlutenNoopWriterRuleSuite.scala} | 24 ++++---- .../utils/velox/VeloxTestSettings.scala | 3 +- .../ClickHouseAdaptiveQueryExecSuite.scala | 4 +- .../velox/VeloxAdaptiveQueryExecSuite.scala | 4 +- .../GlutenNoopWriterRuleSuite.scala | 50 ++++++++++++++++ .../utils/velox/VeloxTestSettings.scala | 1 + .../ClickHouseAdaptiveQueryExecSuite.scala | 4 +- .../velox/VeloxAdaptiveQueryExecSuite.scala | 4 +- .../GlutenNoopWriterRuleSuite.scala | 50 ++++++++++++++++ .../org/apache/gluten/GlutenConfig.scala | 9 +++ 21 files changed, 222 insertions(+), 74 deletions(-) create mode 100644 gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala rename gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/{GlutenWriterColumnarRulesSuite.scala => GlutenNoopWriterRuleSuite.scala} (68%) rename gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/{GlutenWriterColumnarRulesSuite.scala => GlutenNoopWriterRuleSuite.scala} (68%) create mode 100644 gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala create mode 100644 gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 05a50396fca3..e9e8b8351960 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.delta.DeltaLogFileIndex import org.apache.spark.sql.delta.rules.CHOptimizeMetadataOnlyDeltaQuery import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.util.SparkPlanRules @@ -125,6 +126,7 @@ object CHRuleApi { c => intercept( SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session))) + injector.injectPost(c => GlutenNoopWriterRule.apply(c.session)) // Gluten columnar: Final rules. injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session)) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 460547bc7154..89ded8cd0413 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -36,6 +36,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec +import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.joins.BaseJoinExec @@ -103,6 +104,7 @@ object VeloxRuleApi { .getExtendedColumnarPostRules() .foreach(each => injector.injectPost(c => each(c.session))) injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf)) + injector.injectPost(c => GlutenNoopWriterRule(c.session)) // Gluten columnar: Final rules. injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session)) @@ -175,6 +177,7 @@ object VeloxRuleApi { .getExtendedColumnarPostRules() .foreach(each => injector.injectPostTransform(c => each(c.session))) injector.injectPostTransform(c => ColumnarCollapseTransformStages(c.glutenConf)) + injector.injectPostTransform(c => GlutenNoopWriterRule(c.session)) injector.injectPostTransform(c => RemoveGlutenTableCacheColumnarToRow(c.session)) injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, c.session)) injector.injectPostTransform(_ => RemoveFallbackTagRule()) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 126417bf18a5..68e309cf3fd9 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, DataWritingCommandExec} -import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec} import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.vectorized.ColumnarBatch @@ -133,19 +132,33 @@ object GlutenWriterColumnarRules { } } - case class NativeWritePostRule(session: SparkSession) extends Rule[SparkPlan] { + private[datasources] def injectFakeRowAdaptor(command: SparkPlan, child: SparkPlan): SparkPlan = { + child match { + // if the child is columnar, we can just wrap&transfer the columnar data + case c2r: ColumnarToRowExecBase => + command.withNewChildren(Array(FakeRowAdaptor(c2r.child))) + // If the child is aqe, we make aqe "support columnar", + // then aqe itself will guarantee to generate columnar outputs. + // So FakeRowAdaptor will always consumes columnar data, + // thus avoiding the case of c2r->aqe->r2c->writer + case aqe: AdaptiveSparkPlanExec => + command.withNewChildren( + Array( + FakeRowAdaptor( + AdaptiveSparkPlanExec( + aqe.inputPlan, + aqe.context, + aqe.preprocessingRules, + aqe.isSubquery, + supportsColumnar = true + )))) + case other => command.withNewChildren(Array(FakeRowAdaptor(other))) + } + } - private val NOOP_WRITE = "org.apache.spark.sql.execution.datasources.noop.NoopWrite$" + case class NativeWritePostRule(session: SparkSession) extends Rule[SparkPlan] { override def apply(p: SparkPlan): SparkPlan = p match { - case rc @ AppendDataExec(_, _, write) - if write.getClass.getName == NOOP_WRITE && - BackendsApiManager.getSettings.enableNativeWriteFiles() => - injectFakeRowAdaptor(rc, rc.child) - case rc @ OverwriteByExpressionExec(_, _, write) - if write.getClass.getName == NOOP_WRITE && - BackendsApiManager.getSettings.enableNativeWriteFiles() => - injectFakeRowAdaptor(rc, rc.child) case rc @ DataWritingCommandExec(cmd, child) => // The same thread can set these properties in the last query submission. val fields = child.output.toStructType.fields @@ -165,30 +178,6 @@ object GlutenWriterColumnarRules { case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply)) } - - private def injectFakeRowAdaptor(command: SparkPlan, child: SparkPlan): SparkPlan = { - child match { - // if the child is columnar, we can just wrap&transfer the columnar data - case c2r: ColumnarToRowExecBase => - command.withNewChildren(Array(FakeRowAdaptor(c2r.child))) - // If the child is aqe, we make aqe "support columnar", - // then aqe itself will guarantee to generate columnar outputs. - // So FakeRowAdaptor will always consumes columnar data, - // thus avoiding the case of c2r->aqe->r2c->writer - case aqe: AdaptiveSparkPlanExec => - command.withNewChildren( - Array( - FakeRowAdaptor( - AdaptiveSparkPlanExec( - aqe.inputPlan, - aqe.context, - aqe.preprocessingRules, - aqe.isSubquery, - supportsColumnar = true - )))) - case other => command.withNewChildren(Array(FakeRowAdaptor(other))) - } - } } def injectSparkLocalProperty(spark: SparkSession, format: Option[String]): Unit = { diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala new file mode 100644 index 000000000000..b9823fc2ae7c --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.noop + +import org.apache.gluten.GlutenConfig + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.injectFakeRowAdaptor +import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec} + +case class GlutenNoopWriterRule(session: SparkSession) extends Rule[SparkPlan] { + override def apply(p: SparkPlan): SparkPlan = p match { + case rc @ AppendDataExec(_, _, NoopWrite) if GlutenConfig.get.enableNoopWriter => + injectFakeRowAdaptor(rc, rc.child) + case rc @ OverwriteByExpressionExec(_, _, NoopWrite) if GlutenConfig.get.enableNoopWriter => + injectFakeRowAdaptor(rc, rc.child) + case _ => p + } +} diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 2c6b882850c4..f2c154ce00a4 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -1059,7 +1059,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenSupportsCatalogOptionsSuite] enableSuite[GlutenTableCapabilityCheckSuite] enableSuite[GlutenWriteDistributionAndOrderingSuite] - enableSuite[GlutenWriterColumnarRulesSuite] + enableSuite[GlutenNoopWriterRuleSuite] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] // Exclude the following suite for plan changed from SMJ to SHJ. .exclude("avoid shuffle when join 2 bucketed tables") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 6d3c3e865d58..b3f66c830bcd 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -42,7 +43,8 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") + .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index f8b6092a46f7..a10b4bc75e61 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -41,7 +42,8 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") + .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala similarity index 68% rename from gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala rename to gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala index a5d3f0e3f183..1c4046cd2fa5 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala @@ -16,13 +16,11 @@ */ package org.apache.spark.sql.execution.datasources -import org.apache.gluten.GlutenConfig - import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener -class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait { +class GlutenNoopWriterRuleSuite extends GlutenSQLTestsBaseTrait { class WriterColumnarListener extends QueryExecutionListener { var fakeRowAdaptor: Option[FakeRowAdaptor] = None @@ -37,17 +35,15 @@ class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait { testGluten("writing to noop") { withTempDir { dir => - withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") { - spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) - val listener = new WriterColumnarListener - spark.listenerManager.register(listener) - try { - spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() - spark.sparkContext.listenerBus.waitUntilEmpty() - assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") - } finally { - spark.listenerManager.unregister(listener) - } + spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) + val listener = new WriterColumnarListener + spark.listenerManager.register(listener) + try { + spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() + spark.sparkContext.listenerBus.waitUntilEmpty() + assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") + } finally { + spark.listenerManager.unregister(listener) } } } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index f83b91ede1cc..007f84286d59 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -70,7 +70,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenSupportsCatalogOptionsSuite] enableSuite[GlutenTableCapabilityCheckSuite] enableSuite[GlutenWriteDistributionAndOrderingSuite] - enableSuite[GlutenWriterColumnarRulesSuite] + enableSuite[GlutenNoopWriterRuleSuite] enableSuite[GlutenQueryCompilationErrorsDSv2Suite] diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 441f3a60a3a9..f8f96faa4de8 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformerBase} import org.apache.spark.SparkConf @@ -40,7 +41,8 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") + .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 729a12f56cc5..c6d367932d5f 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -41,7 +42,8 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") + .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala similarity index 68% rename from gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala rename to gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala index a5d3f0e3f183..1c4046cd2fa5 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala @@ -16,13 +16,11 @@ */ package org.apache.spark.sql.execution.datasources -import org.apache.gluten.GlutenConfig - import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener -class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait { +class GlutenNoopWriterRuleSuite extends GlutenSQLTestsBaseTrait { class WriterColumnarListener extends QueryExecutionListener { var fakeRowAdaptor: Option[FakeRowAdaptor] = None @@ -37,17 +35,15 @@ class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait { testGluten("writing to noop") { withTempDir { dir => - withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") { - spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) - val listener = new WriterColumnarListener - spark.listenerManager.register(listener) - try { - spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() - spark.sparkContext.listenerBus.waitUntilEmpty() - assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") - } finally { - spark.listenerManager.unregister(listener) - } + spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) + val listener = new WriterColumnarListener + spark.listenerManager.register(listener) + try { + spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() + spark.sparkContext.listenerBus.waitUntilEmpty() + assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") + } finally { + spark.listenerManager.unregister(listener) } } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 498c2a3b4e57..f9d5457279f8 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalog import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite} import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite} import org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite -import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, GlutenFileFormatWriterSuite, GlutenFileIndexSuite, GlutenFileMetadataStructRowIndexSuite, GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, GlutenOrcV1AggregatePushDownSuite, GlutenOrcV2AggregatePushDownSuite, GlutenParquetCodecSuite, GlutenParquetReadSchemaSuite, GlutenParquetV1AggregatePushDownSuite, GlutenParquetV2AggregatePushDownSuite, GlutenPathFilterStrategySuite, GlutenPathFilterSuite, GlutenPruneFileSourcePartitionsSuite, GlutenV1WriteCommandSuite, GlutenVectorizedOrcReadSchemaSuite, GlutenVectorizedParquetReadSchemaSuite} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite import org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParserSuite, GlutenCSVv1Suite, GlutenCSVv2Suite} import org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite @@ -802,6 +802,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFileSourceStrategySuite] // Plan comparison. .exclude("partitioned table - after scan filters") + enableSuite[GlutenNoopWriterRuleSuite] enableSuite[GlutenHadoopFileLinesReaderSuite] enableSuite[GlutenPathFilterStrategySuite] enableSuite[GlutenPathFilterSuite] diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 49d47fa65b1f..73f34c990c91 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -41,7 +42,8 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") + .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 729a12f56cc5..c6d367932d5f 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -41,7 +42,8 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") + .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala new file mode 100644 index 000000000000..1c4046cd2fa5 --- /dev/null +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +class GlutenNoopWriterRuleSuite extends GlutenSQLTestsBaseTrait { + + class WriterColumnarListener extends QueryExecutionListener { + var fakeRowAdaptor: Option[FakeRowAdaptor] = None + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + + testGluten("writing to noop") { + withTempDir { + dir => + spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) + val listener = new WriterColumnarListener + spark.listenerManager.register(listener) + try { + spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() + spark.sparkContext.listenerBus.waitUntilEmpty() + assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") + } finally { + spark.listenerManager.unregister(listener) + } + } + } +} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 904439e83e05..68ec6582893c 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -815,6 +815,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFileSourceStrategySuite] // Plan comparison. .exclude("partitioned table - after scan filters") + enableSuite[GlutenNoopWriterRuleSuite] enableSuite[GlutenHadoopFileLinesReaderSuite] enableSuite[GlutenPathFilterStrategySuite] enableSuite[GlutenPathFilterSuite] diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 2e5df7b859e3..d56b1426654d 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -41,7 +42,8 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") + .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 729a12f56cc5..c6d367932d5f 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -41,7 +42,8 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") + .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala new file mode 100644 index 000000000000..1c4046cd2fa5 --- /dev/null +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +class GlutenNoopWriterRuleSuite extends GlutenSQLTestsBaseTrait { + + class WriterColumnarListener extends QueryExecutionListener { + var fakeRowAdaptor: Option[FakeRowAdaptor] = None + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + + testGluten("writing to noop") { + withTempDir { + dir => + spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) + val listener = new WriterColumnarListener + spark.listenerManager.register(listener) + try { + spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() + spark.sparkContext.listenerBus.waitUntilEmpty() + assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") + } finally { + spark.listenerManager.unregister(listener) + } + } + } +} diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 2c03ecf3e824..40b7001c29b9 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -453,6 +453,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { // Please use `BackendsApiManager.getSettings.enableNativeWriteFiles()` instead def enableNativeWriter: Option[Boolean] = conf.getConf(NATIVE_WRITER_ENABLED) + def enableNoopWriter: Boolean = conf.getConf(NOOP_WRITER_ENABLED) + def enableNativeArrowReader: Boolean = conf.getConf(NATIVE_ARROW_READER_ENABLED) def directorySizeGuess: Long = @@ -1776,6 +1778,13 @@ object GlutenConfig { .booleanConf .createOptional + val NOOP_WRITER_ENABLED = + buildConf("spark.gluten.sql.noop.writer.enabled") + .internal() + .doc("Wether to enable noop writer. When true, Gluten will add FakeRowAdaptor to avoid c2r.") + .booleanConf + .createWithDefault(true) + val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED = buildConf("spark.gluten.sql.native.hive.writer.enabled") .internal() From b09a2d93a62cf3aa0ccfd7eb1090495a9676f18f Mon Sep 17 00:00:00 2001 From: jackylee-ch Date: Tue, 7 Jan 2025 13:36:39 +0800 Subject: [PATCH 2/3] remove noopWrite config and fix test failed --- .../GlutenWriterColumnarRules.scala | 2 +- .../noop/GlutenNoopWriterRule.scala | 6 +- .../clickhouse/ClickHouseTestSettings.scala | 3 + .../utils/velox/VeloxTestSettings.scala | 2 - .../ClickHouseAdaptiveQueryExecSuite.scala | 87 ++++++++++++++++++- .../velox/VeloxAdaptiveQueryExecSuite.scala | 87 ++++++++++++++++++- .../clickhouse/ClickHouseTestSettings.scala | 3 + .../utils/velox/VeloxTestSettings.scala | 2 - .../ClickHouseAdaptiveQueryExecSuite.scala | 87 ++++++++++++++++++- .../velox/VeloxAdaptiveQueryExecSuite.scala | 87 ++++++++++++++++++- .../clickhouse/ClickHouseTestSettings.scala | 2 + .../utils/velox/VeloxTestSettings.scala | 1 - .../ClickHouseAdaptiveQueryExecSuite.scala | 36 +++++++- .../velox/VeloxAdaptiveQueryExecSuite.scala | 36 +++++++- .../clickhouse/ClickHouseTestSettings.scala | 2 + .../utils/velox/VeloxTestSettings.scala | 1 - .../ClickHouseAdaptiveQueryExecSuite.scala | 36 +++++++- .../velox/VeloxAdaptiveQueryExecSuite.scala | 36 +++++++- .../apache/gluten/config/GlutenConfig.scala | 9 -- 19 files changed, 497 insertions(+), 28 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 68e309cf3fd9..54b5a3463991 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -134,7 +134,7 @@ object GlutenWriterColumnarRules { private[datasources] def injectFakeRowAdaptor(command: SparkPlan, child: SparkPlan): SparkPlan = { child match { - // if the child is columnar, we can just wrap&transfer the columnar data + // if the child is columnar, we can just wrap & transfer the columnar data case c2r: ColumnarToRowExecBase => command.withNewChildren(Array(FakeRowAdaptor(c2r.child))) // If the child is aqe, we make aqe "support columnar", diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala index 4c076161d7f4..976880a27bc9 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.noop -import org.apache.gluten.config.GlutenConfig - import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan @@ -26,9 +24,9 @@ import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteB case class GlutenNoopWriterRule(session: SparkSession) extends Rule[SparkPlan] { override def apply(p: SparkPlan): SparkPlan = p match { - case rc @ AppendDataExec(_, _, NoopWrite) if GlutenConfig.get.enableNoopWriter => + case rc @ AppendDataExec(_, _, NoopWrite) => injectFakeRowAdaptor(rc, rc.child) - case rc @ OverwriteByExpressionExec(_, _, NoopWrite) if GlutenConfig.get.enableNoopWriter => + case rc @ OverwriteByExpressionExec(_, _, NoopWrite) => injectFakeRowAdaptor(rc, rc.child) case _ => p } diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index f43a3977a3ce..45ed47cedce7 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1122,6 +1122,9 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude("SPARK-32932: Do not use local shuffle read at final stage on write command") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") .exclude("SPARK-32717: AQEOptimizer should respect excludedRules configuration") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index f2c154ce00a4..ffe43a0618d5 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -164,14 +164,12 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", "SPARK-34533", "SPARK-34781", "SPARK-35585", - "SPARK-32932", "SPARK-33494", // "SPARK-33933", "SPARK-31220", diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 55eaf084876e..928dc38985ce 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -26,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.log4j.Level @@ -44,7 +50,6 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") - .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1198,6 +1203,86 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 8777469aac7a..ce9513c8cc9b 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -26,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.log4j.Level @@ -43,7 +49,6 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") - .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1177,6 +1182,86 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 126749f78c82..39794bd29079 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1118,6 +1118,9 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude("SPARK-32932: Do not use local shuffle read at final stage on write command") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 007f84286d59..a6712d282942 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -189,14 +189,12 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", "SPARK-34533", "SPARK-34781", "SPARK-35585", - "SPARK-32932", "SPARK-33494", "SPARK-33933", "SPARK-31220", diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 1de772a7ac62..779d264114cb 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -25,14 +25,20 @@ import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -42,7 +48,6 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") - .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1195,6 +1200,86 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index b9d031a04458..f9f0723e00cc 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -26,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -43,7 +49,6 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") - .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1181,6 +1186,86 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 829fae1cf590..f3b1786dfb6b 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -982,6 +982,8 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index f9d5457279f8..9ed9b54faa52 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -178,7 +178,6 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index cb2592a0a4dc..2bd5a96dadee 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -26,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -34,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -43,7 +47,6 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") - .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1184,6 +1187,37 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index b9d031a04458..6a3d6da27cef 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -26,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -34,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -43,7 +47,6 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") - .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1181,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 59e69858017d..b9cbe62091cd 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -982,6 +982,8 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 68ec6582893c..186588f48102 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -181,7 +181,6 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index d6d757ef8b14..bd941586d73c 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -26,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -34,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -43,7 +47,6 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") - .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1199,6 +1202,37 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index b9d031a04458..6a3d6da27cef 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -26,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -34,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -43,7 +47,6 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") - .set(GlutenConfig.NOOP_WRITER_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1181,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 875f5b1f667e..b1337c92eec1 100644 --- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -453,8 +453,6 @@ class GlutenConfig(conf: SQLConf) extends Logging { // Please use `BackendsApiManager.getSettings.enableNativeWriteFiles()` instead def enableNativeWriter: Option[Boolean] = conf.getConf(NATIVE_WRITER_ENABLED) - def enableNoopWriter: Boolean = conf.getConf(NOOP_WRITER_ENABLED) - def enableNativeArrowReader: Boolean = conf.getConf(NATIVE_ARROW_READER_ENABLED) def directorySizeGuess: Long = @@ -1778,13 +1776,6 @@ object GlutenConfig { .booleanConf .createOptional - val NOOP_WRITER_ENABLED = - buildConf("spark.gluten.sql.noop.writer.enabled") - .internal() - .doc("Wether to enable noop writer. When true, Gluten will add FakeRowAdaptor to avoid c2r.") - .booleanConf - .createWithDefault(true) - val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED = buildConf("spark.gluten.sql.native.hive.writer.enabled") .internal() From 887e94ba9d574fd27eabe336fdbd24d0a0009f1a Mon Sep 17 00:00:00 2001 From: jackylee-ch Date: Wed, 8 Jan 2025 16:34:14 +0800 Subject: [PATCH 3/3] add comments --- .../noop/GlutenNoopWriterRule.scala | 9 ++++ .../utils/velox/VeloxTestSettings.scala | 1 - .../utils/velox/VeloxTestSettings.scala | 1 - .../GlutenNoopWriterRuleSuite.scala | 50 ------------------- .../utils/velox/VeloxTestSettings.scala | 1 - .../GlutenNoopWriterRuleSuite.scala | 50 ------------------- .../utils/velox/VeloxTestSettings.scala | 1 - .../GlutenNoopWriterRuleSuite.scala | 50 ------------------- .../GlutenNoopWriterRuleSuite.scala | 28 +++++++++-- 9 files changed, 33 insertions(+), 158 deletions(-) delete mode 100644 gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala delete mode 100644 gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala delete mode 100644 gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala rename gluten-ut/{spark32/src/test/scala/org/apache/spark/sql/execution => test/src/test/scala/org/apache/spark/sql}/datasources/GlutenNoopWriterRuleSuite.scala (65%) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala index 976880a27bc9..bedf006510d3 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala @@ -22,6 +22,15 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.injectFakeRowAdaptor import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec} +/** + * A rule that injects a FakeRowAdaptor for NoopWrite. + * + * The current V2 Command does not support columnar. Therefore, when its child node is a + * ColumnarNode, Vanilla Spark inserts a ColumnarToRow conversion between V2 Command and its child. + * This rule replaces the inserted ColumnarToRow with a FakeRowAdaptor, effectively bypassing the + * ColumnarToRow operation for NoopWrite. Since NoopWrite does not actually perform any data + * operations, it can accept input data in either row-based or columnar format. + */ case class GlutenNoopWriterRule(session: SparkSession) extends Rule[SparkPlan] { override def apply(p: SparkPlan): SparkPlan = p match { case rc @ AppendDataExec(_, _, NoopWrite) => diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index ffe43a0618d5..0e56debcd5ed 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -1057,7 +1057,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenSupportsCatalogOptionsSuite] enableSuite[GlutenTableCapabilityCheckSuite] enableSuite[GlutenWriteDistributionAndOrderingSuite] - enableSuite[GlutenNoopWriterRuleSuite] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] // Exclude the following suite for plan changed from SMJ to SHJ. .exclude("avoid shuffle when join 2 bucketed tables") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index a6712d282942..c12aae92fd31 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -70,7 +70,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenSupportsCatalogOptionsSuite] enableSuite[GlutenTableCapabilityCheckSuite] enableSuite[GlutenWriteDistributionAndOrderingSuite] - enableSuite[GlutenNoopWriterRuleSuite] enableSuite[GlutenQueryCompilationErrorsDSv2Suite] diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala deleted file mode 100644 index 1c4046cd2fa5..000000000000 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.QueryExecutionListener - -class GlutenNoopWriterRuleSuite extends GlutenSQLTestsBaseTrait { - - class WriterColumnarListener extends QueryExecutionListener { - var fakeRowAdaptor: Option[FakeRowAdaptor] = None - - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } - } - - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - } - - testGluten("writing to noop") { - withTempDir { - dir => - spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) - val listener = new WriterColumnarListener - spark.listenerManager.register(listener) - try { - spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() - spark.sparkContext.listenerBus.waitUntilEmpty() - assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") - } finally { - spark.listenerManager.unregister(listener) - } - } - } -} diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 9ed9b54faa52..c8fe551c444e 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -801,7 +801,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFileSourceStrategySuite] // Plan comparison. .exclude("partitioned table - after scan filters") - enableSuite[GlutenNoopWriterRuleSuite] enableSuite[GlutenHadoopFileLinesReaderSuite] enableSuite[GlutenPathFilterStrategySuite] enableSuite[GlutenPathFilterSuite] diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala deleted file mode 100644 index 1c4046cd2fa5..000000000000 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.QueryExecutionListener - -class GlutenNoopWriterRuleSuite extends GlutenSQLTestsBaseTrait { - - class WriterColumnarListener extends QueryExecutionListener { - var fakeRowAdaptor: Option[FakeRowAdaptor] = None - - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } - } - - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - } - - testGluten("writing to noop") { - withTempDir { - dir => - spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) - val listener = new WriterColumnarListener - spark.listenerManager.register(listener) - try { - spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() - spark.sparkContext.listenerBus.waitUntilEmpty() - assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") - } finally { - spark.listenerManager.unregister(listener) - } - } - } -} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 186588f48102..6f1813f13336 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -814,7 +814,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFileSourceStrategySuite] // Plan comparison. .exclude("partitioned table - after scan filters") - enableSuite[GlutenNoopWriterRuleSuite] enableSuite[GlutenHadoopFileLinesReaderSuite] enableSuite[GlutenPathFilterStrategySuite] enableSuite[GlutenPathFilterSuite] diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala deleted file mode 100644 index 1c4046cd2fa5..000000000000 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.QueryExecutionListener - -class GlutenNoopWriterRuleSuite extends GlutenSQLTestsBaseTrait { - - class WriterColumnarListener extends QueryExecutionListener { - var fakeRowAdaptor: Option[FakeRowAdaptor] = None - - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } - } - - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - } - - testGluten("writing to noop") { - withTempDir { - dir => - spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) - val listener = new WriterColumnarListener - spark.listenerManager.register(listener) - try { - spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() - spark.sparkContext.listenerBus.waitUntilEmpty() - assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") - } finally { - spark.listenerManager.unregister(listener) - } - } - } -} diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala similarity index 65% rename from gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala rename to gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala index 1c4046cd2fa5..ebf17444e623 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenNoopWriterRuleSuite.scala +++ b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala @@ -14,13 +14,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.datasources -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.utils.{BackendTestUtils, SystemParameters} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{GlutenQueryTest, SaveMode} import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.util.QueryExecutionListener -class GlutenNoopWriterRuleSuite extends GlutenSQLTestsBaseTrait { +class GlutenNoopWriterRuleSuite extends GlutenQueryTest with SharedSparkSession { + + override def sparkConf: SparkConf = { + val conf = super.sparkConf + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.default.parallelism", "1") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.ui.enabled", "false") + .set("spark.gluten.ui.enabled", "false") + if (BackendTestUtils.isCHBackendLoaded()) { + conf.set(GlutenConfig.GLUTEN_LIB_PATH, SystemParameters.getClickHouseLibPath) + } + conf + } class WriterColumnarListener extends QueryExecutionListener { var fakeRowAdaptor: Option[FakeRowAdaptor] = None @@ -32,7 +52,7 @@ class GlutenNoopWriterRuleSuite extends GlutenSQLTestsBaseTrait { override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } - testGluten("writing to noop") { + test("writing to noop") { withTempDir { dir => spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath)