From b24d313ffd4178edb17ceb5436b3bac827198098 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 14 Apr 2025 09:56:20 +0200 Subject: [PATCH 1/5] fixup --- .../org/apache/gluten/GlutenPlugin.scala | 14 +++++----- .../spark/sql/internal/SparkConfigUtil.scala | 26 +++++++++++++++++++ 2 files changed, 33 insertions(+), 7 deletions(-) create mode 100644 shims/common/src/main/scala/org/apache/spark/sql/internal/SparkConfigUtil.scala diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index d01a23712e96..e749bfe93024 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.softaffinity.SoftAffinityListener import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener, GlutenUIUtils} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SparkConfigUtil, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS import org.apache.spark.task.TaskResources import org.apache.spark.util.SparkResourceUtil @@ -137,13 +137,13 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { private def setPredefinedConfigs(conf: SparkConf): Unit = { // Spark SQL extensions - val extensions = if (conf.contains(SPARK_SESSION_EXTENSIONS.key)) { - s"${conf.get(SPARK_SESSION_EXTENSIONS.key)}," + - s"${GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME}" - } else { - s"${GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME}" + val extensionSeq = + SparkConfigUtil.getEntryValue(conf, SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty) + if (!extensionSeq.toSet.contains(GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME)) { + conf.set( + SPARK_SESSION_EXTENSIONS.key, + (extensionSeq ++ GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME).mkString(",")) } - conf.set(SPARK_SESSION_EXTENSIONS.key, extensions) // adaptive custom cost evaluator class val enableGlutenCostEvaluator = conf.getBoolean( diff --git a/shims/common/src/main/scala/org/apache/spark/sql/internal/SparkConfigUtil.scala b/shims/common/src/main/scala/org/apache/spark/sql/internal/SparkConfigUtil.scala new file mode 100644 index 000000000000..945174073e48 --- /dev/null +++ b/shims/common/src/main/scala/org/apache/spark/sql/internal/SparkConfigUtil.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.internal + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigEntry + +object SparkConfigUtil { + def getEntryValue[T](conf: SparkConf, entry: ConfigEntry[T]): T = { + conf.get(entry) + } +} From 6aef7292fd5dec1b5e1318e4937e7d6e54653b0b Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 14 Apr 2025 10:32:43 +0200 Subject: [PATCH 2/5] fixup --- gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index e749bfe93024..eb4e069be22e 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -142,7 +142,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { if (!extensionSeq.toSet.contains(GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME)) { conf.set( SPARK_SESSION_EXTENSIONS.key, - (extensionSeq ++ GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME).mkString(",")) + (extensionSeq :+ GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME).mkString(",")) } // adaptive custom cost evaluator class From 72074da63ed15b270ffdd2a6a1d3a0cc4e872ae9 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 14 Apr 2025 17:53:02 +0200 Subject: [PATCH 3/5] fixup --- .../src/main/scala/org/apache/gluten/config/VeloxConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 30f395451943..b94da4eafd8b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -290,7 +290,7 @@ object VeloxConfig { buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput") .internal() .doc(s"If true, combine small columnar batches together before sending to shuffle. " + - s"The default minimum output batch size is equal to 0.8 * ${COLUMNAR_MAX_BATCH_SIZE.key}") + s"The default minimum output batch size is equal to 0.25 * ${COLUMNAR_MAX_BATCH_SIZE.key}") .booleanConf .createWithDefault(true) From cbeb64efbebd52bbc2c715c00575dd374cf3852a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 14 Apr 2025 17:55:28 +0200 Subject: [PATCH 4/5] fixup --- .github/workflows/velox_backend.yml | 14 +++++++------- .../tags/{SkipTestTags.java => SkipTest.java} | 2 +- .../execution/DynamicOffHeapSizingSuite.scala | 4 ++-- .../apache/gluten/expression/VeloxUdfSuite.scala | 4 ++-- .../apache/gluten/fuzzer/RowToColumnarFuzzer.scala | 4 ++-- .../apache/gluten/fuzzer/ShuffleWriterFuzzer.scala | 4 ++-- 6 files changed, 16 insertions(+), 16 deletions(-) rename backends-velox/src/test/java/org/apache/gluten/tags/{SkipTestTags.java => SkipTest.java} (96%) diff --git a/.github/workflows/velox_backend.yml b/.github/workflows/velox_backend.yml index 1481b65a9948..8e23d2d68e19 100644 --- a/.github/workflows/velox_backend.yml +++ b/.github/workflows/velox_backend.yml @@ -642,7 +642,7 @@ jobs: export SPARK_SCALA_VERSION=2.12 $MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox -Piceberg \ -Pdelta -Phudi -DargLine="-Dspark.test.home=/opt/shims/spark32/spark_home/" \ - -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags + -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest - name: Upload test report if: always() uses: actions/upload-artifact@v4 @@ -729,7 +729,7 @@ jobs: java -version $MVN_CMD clean test -Pspark-3.3 -Pjava-17 -Pbackends-velox -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=/opt/shims/spark33/spark_home/" \ - -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags + -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest - name: Upload test report if: always() uses: actions/upload-artifact@v4 @@ -823,7 +823,7 @@ jobs: export SPARK_HOME=/opt/shims/spark34/spark_home/ ls -l $SPARK_HOME $MVN_CMD clean test -Pspark-3.4 -Pjava-17 -Pbackends-velox -Piceberg -Pdelta -Phudi -Pspark-ut \ - -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags \ + -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest \ -DargLine="-Dspark.test.home=$SPARK_HOME ${EXTRA_FLAGS}" - name: Upload test report if: always() @@ -918,7 +918,7 @@ jobs: java -version $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/ ${EXTRA_FLAGS}" \ - -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags + -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest - name: Upload test report if: always() uses: actions/upload-artifact@v4 @@ -971,7 +971,7 @@ jobs: java -version $MVN_CMD clean test -Pspark-3.5 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Piceberg \ -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark35-scala-2.13/spark_home/" \ - -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags + -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest - name: Upload test report if: always() uses: actions/upload-artifact@v4 @@ -1057,7 +1057,7 @@ jobs: java -version $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg -Pdelta -Pspark-ut \ -DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/ -Dspark.gluten.ras.enabled=true" \ - -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags + -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest - name: Upload test report uses: actions/upload-artifact@v4 with: @@ -1141,7 +1141,7 @@ jobs: java -version $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg -Pdelta -Pspark-ut \ -DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/ -Dspark.gluten.sql.columnar.forceShuffledHashJoin=false" \ - -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags + -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest - name: Upload test report uses: actions/upload-artifact@v4 with: diff --git a/backends-velox/src/test/java/org/apache/gluten/tags/SkipTestTags.java b/backends-velox/src/test/java/org/apache/gluten/tags/SkipTest.java similarity index 96% rename from backends-velox/src/test/java/org/apache/gluten/tags/SkipTestTags.java rename to backends-velox/src/test/java/org/apache/gluten/tags/SkipTest.java index b58a17e7b142..b21e0a9d7893 100644 --- a/backends-velox/src/test/java/org/apache/gluten/tags/SkipTestTags.java +++ b/backends-velox/src/test/java/org/apache/gluten/tags/SkipTest.java @@ -23,4 +23,4 @@ @TagAnnotation @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD, ElementType.TYPE}) -public @interface SkipTestTags {} +public @interface SkipTest {} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala index 8ada8d1c81f2..e9c57f0ccda4 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala @@ -17,11 +17,11 @@ package org.apache.gluten.execution import org.apache.gluten.benchmarks.RandomParquetDataGenerator -import org.apache.gluten.tags.SkipTestTags +import org.apache.gluten.tags.SkipTest import org.apache.spark.SparkConf -@SkipTestTags +@SkipTest class DynamicOffHeapSizingSuite extends VeloxWholeStageTransformerSuite { override protected val resourcePath: String = "/tpch-data-parquet" override protected val fileFormat: String = "parquet" diff --git a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala index 564b65267875..0b22b51018de 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala @@ -17,7 +17,7 @@ package org.apache.gluten.expression import org.apache.gluten.execution.ProjectExecTransformer -import org.apache.gluten.tags.{SkipTestTags, UDFTest} +import org.apache.gluten.tags.{SkipTest, UDFTest} import org.apache.spark.SparkConf import org.apache.spark.sql.{GlutenQueryTest, Row, SparkSession} @@ -255,7 +255,7 @@ class VeloxUdfSuiteLocal extends VeloxUdfSuite { // /path/to/gluten/package/target/gluten-package-${project.version}.jar // -Dvelox.udf.lib.path=\ // /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so -@SkipTestTags +@SkipTest class VeloxUdfSuiteCluster extends VeloxUdfSuite { override val master: String = "local-cluster[2,2,1024]" diff --git a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/RowToColumnarFuzzer.scala b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/RowToColumnarFuzzer.scala index d60e577aca09..20efa3ffbc7f 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/RowToColumnarFuzzer.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/RowToColumnarFuzzer.scala @@ -18,13 +18,13 @@ package org.apache.gluten.fuzzer import org.apache.gluten.execution.RowToVeloxColumnarExec import org.apache.gluten.fuzzer.FuzzerResult.Successful -import org.apache.gluten.tags.{FuzzerTest, SkipTestTags} +import org.apache.gluten.tags.{FuzzerTest, SkipTest} import org.apache.spark.SparkConf import org.apache.spark.sql.DataFrame @FuzzerTest -@SkipTestTags +@SkipTest class RowToColumnarFuzzer extends FuzzerBase { override protected def sparkConf: SparkConf = { diff --git a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzer.scala b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzer.scala index 44f25b517b46..ba7d8b90c9ee 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzer.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/fuzzer/ShuffleWriterFuzzer.scala @@ -17,13 +17,13 @@ package org.apache.gluten.fuzzer import org.apache.gluten.fuzzer.FuzzerResult.Successful -import org.apache.gluten.tags.{FuzzerTest, SkipTestTags} +import org.apache.gluten.tags.{FuzzerTest, SkipTest} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.ColumnarShuffleExchangeExec @FuzzerTest -@SkipTestTags +@SkipTest class ShuffleWriterFuzzer extends FuzzerBase { private val REPARTITION_SQL = (numPartitions: Int) => s"select /*+ REPARTITION($numPartitions) */ * from tbl" From 2a2de022fe832a5fa95b3c20e54fa0e24079f4ee Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 14 Apr 2025 18:20:39 +0200 Subject: [PATCH 5/5] fixup --- .../execution/DynamicOffHeapSizingSuite.scala | 2 +- .../DynamicOffHeapSizingMemoryTarget.java | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala index e9c57f0ccda4..30442c664582 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala @@ -51,7 +51,7 @@ class DynamicOffHeapSizingSuite extends VeloxWholeStageTransformerSuite { getRootCause(e.getCause) } - test("Dynamic Off-Heap Sizing") { + test("Dynamic off-heap sizing") { System.gc() dataGenerator.generateRandomData(spark, Some(outputPath)) spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl") diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java index 387a758ef856..4056df062ee8 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java @@ -43,12 +43,16 @@ public long borrow(long size) { return 0; } - long totalMemory = Runtime.getRuntime().totalMemory(); - long freeMemory = Runtime.getRuntime().freeMemory(); - long usedOnHeapBytes = (totalMemory - freeMemory); + // Only JVM shrinking can reclaim space from the total JVM memory. + // See https://github.com/apache/incubator-gluten/issues/9276. + long totalHeapMemory = Runtime.getRuntime().totalMemory(); + long freeHeapMemory = Runtime.getRuntime().freeMemory(); + long usedOffHeapBytesNow = USED_OFFHEAP_BYTES.get(); - if (size + usedOffHeapBytesNow + usedOnHeapBytes > MAX_MEMORY_IN_BYTES) { + // Adds the total JVM memory which is the actual memory the JVM occupied from the operating + // system into the counter. + if (size + usedOffHeapBytesNow + totalHeapMemory > MAX_MEMORY_IN_BYTES) { LOG.warn( String.format( "Failing allocation as unified memory is OOM. " @@ -56,9 +60,9 @@ public long borrow(long size) { + "Free On-heap: %d, Total On-heap: %d, " + "Max On-heap: %d, Allocation: %d.", usedOffHeapBytesNow, - usedOnHeapBytes, - freeMemory, - totalMemory, + totalHeapMemory - freeHeapMemory, + freeHeapMemory, + totalHeapMemory, MAX_MEMORY_IN_BYTES, size));