From a8f038a64c0cf5dd10a1d864df25bf3d6e0feabf Mon Sep 17 00:00:00 2001 From: Marcus Markiewicz Date: Wed, 17 Apr 2024 10:54:54 -0400 Subject: [PATCH 1/4] Today, in Spark we specify the on-heap and off-heap memory sizes as a configuration value read at the beginning of executing a job. With this change, we are exposing a new feature that is enabled with a new spark.gluten.memory.dynamic.offHeap.sizing.enabled setting. When this setting is configured to true, the offheap setting will be ignored in Gluten and we will size the offheap as the same size as the spark.executor.memory setting. We will then proceed to enforcing a total memory quota, calculated by the sum of what memory is committed and in use in the Java heap (calculated with Runtime.getRuntime().totalMemory() - Runtime.GetRuntime().freeMemory()) plus the tracked off-heap memory in TreeMemoryConsumer. When there is an allocation that would tide us over this total amount of committed memory, we will fail the allocation and trigger an OOM. Note that with this change, we perform the "quota check" when an allocation in the native engine is informed to Gluten. In practice, this means that it is possible that the Java codebase can oversubscribe memory as it allocates, which is under the on-heap quota, although there is enough off-heap usage where we should fail the allocation. A test exercising this setting is part of this change. --- .../execution/DynamicOffHeapSizingTest.scala | 63 ++++++++++++++++ .../memtarget/ThrowOnOomMemoryTarget.java | 19 ++++- .../DynamicOffHeapSizingPolicyChecker.java | 71 +++++++++++++++++++ .../memtarget/spark/TreeMemoryConsumer.java | 20 +++++- .../memtarget/spark/TreeMemoryConsumers.java | 9 ++- .../org/apache/gluten/GlutenPlugin.scala | 51 +++++++++++-- .../org/apache/gluten/GlutenConfig.scala | 26 +++++++ 7 files changed, 249 insertions(+), 10 deletions(-) create mode 100644 backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala create mode 100644 gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/DynamicOffHeapSizingPolicyChecker.java diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala new file mode 100644 index 000000000000..c70ec73fdf78 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.benchmarks.RandomParquetDataGenerator +import org.apache.gluten.tags.SkipTestTags + +import org.apache.spark.SparkConf + +@SkipTestTags +class DynamicOffHeapSizingTest extends VeloxWholeStageTransformerSuite { + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + private val dataGenerator = RandomParquetDataGenerator(System.currentTimeMillis()) + private val outputPath = getClass.getResource("/").getPath + "dynamicoffheapsizing_output.parquet" + private val AGG_SQL = + """select f_1, count(DISTINCT f_1) + |from tbl group + |group by 1""".stripMargin + + override def beforeAll(): Unit = { + super.beforeAll() + } + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.executor.memory", "5GB") + .set("spark.gluten.memory.dynamic.offHeap.sizing.enabled", "true") + .set("spark.driver.maxResultSize", "50GB") + .set("spark.gluten.sql.debug", "true") + .set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0") + } + + def getRootCause(e: Throwable): Throwable = { + if (e.getCause == null) { + return e + } + getRootCause(e.getCause) + } + + test("Dynamic Off-Heap Sizing") { + System.gc() + dataGenerator.generateRandomData(spark, Some(outputPath)) + spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl") + val df = spark.sql(AGG_SQL) + df + } +} diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java index 6621f3b1683f..e6b6ba07eb6b 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java @@ -52,8 +52,10 @@ public long borrow(long size) { .append( String.format( "Not enough spark off-heap execution memory. Acquired: %s, granted: %s. " - + "Try tweaking config option spark.memory.offHeap.size to get larger space " - + "to run this application. %n", + + "Try tweaking config option spark.memory.offHeap.size to get larger " + + "space to run this application " + + "(if spark.gluten.memory.dynamic.offHeap.sizing.enabled " + + "is not enabled). %n", Utils.bytesToString(size), Utils.bytesToString(granted))) .append("Current config settings: ") .append(System.lineSeparator()) @@ -83,6 +85,19 @@ public long borrow(long size) { .getConfString( GlutenConfig$.MODULE$ .GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY())))) + .append(System.lineSeparator()) + .append( + String.format( + "\t%s=%s", + GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED(), + SQLConf.get().getConfString(GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED()))) + .append(System.lineSeparator()) + .append( + String.format( + "\t%s=%s", + GlutenConfig$.MODULE$.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED(), + SQLConf.get() + .getConfString(GlutenConfig$.MODULE$.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED()))) .append(System.lineSeparator()); // Dump all consumer usages to exception body errorBuilder.append(SparkMemoryUtil.dumpMemoryTargetStats(target)); diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/DynamicOffHeapSizingPolicyChecker.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/DynamicOffHeapSizingPolicyChecker.java new file mode 100644 index 000000000000..ef9f10e5bd91 --- /dev/null +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/DynamicOffHeapSizingPolicyChecker.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.memory.memtarget.spark; + +import org.apache.gluten.GlutenConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicLong; + +public final class DynamicOffHeapSizingPolicyChecker { + private static final Logger LOG = + LoggerFactory.getLogger(DynamicOffHeapSizingPolicyChecker.class); + private final long maxOnHeapMemoryInBytes = GlutenConfig.getConf().onHeapMemorySize(); + private final AtomicLong usedOffHeapBytes = new AtomicLong(); + + DynamicOffHeapSizingPolicyChecker() {} + + public boolean canBorrow(long size) { + if (size == 0) { + return true; + } + + long totalMemory = Runtime.getRuntime().totalMemory(); + long freeMemory = Runtime.getRuntime().freeMemory(); + long usedOnHeapBytes = (totalMemory - freeMemory); + long usedOffHeapBytesNow = this.usedOffHeapBytes.get(); + + if (size + usedOffHeapBytesNow + usedOnHeapBytes > maxOnHeapMemoryInBytes) { + LOG.warn( + String.format( + "Failing allocation as unified memory is OOM. " + + "Used Off-heap: %d, Used On-Heap: %d," + + "Free On-heap: %d, Total On-heap: %d," + + "Max On-heap: %d, Allocation: %d.", + usedOffHeapBytesNow, + usedOnHeapBytes, + freeMemory, + totalMemory, + maxOnHeapMemoryInBytes, + size)); + + return false; + } + + return true; + } + + public void borrow(long size) { + usedOffHeapBytes.addAndGet(size); + } + + public void repay(long size) { + usedOffHeapBytes.addAndGet(-size); + } +} diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java index f86295b0697f..4f991275ecda 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java @@ -56,9 +56,12 @@ public class TreeMemoryConsumer extends MemoryConsumer implements TreeMemoryTarg private final SimpleMemoryUsageRecorder recorder = new SimpleMemoryUsageRecorder(); private final Map children = new HashMap<>(); private final String name = MemoryTargetUtil.toUniqueName("Gluten.Tree"); + private final DynamicOffHeapSizingPolicyChecker policyChecker; - TreeMemoryConsumer(TaskMemoryManager taskMemoryManager) { + TreeMemoryConsumer( + TaskMemoryManager taskMemoryManager, DynamicOffHeapSizingPolicyChecker policyChecker) { super(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.OFF_HEAP); + this.policyChecker = policyChecker; } @Override @@ -67,8 +70,18 @@ public long borrow(long size) { // or Spark complains about the zero size by throwing an error return 0; } + + if (this.policyChecker != null && !this.policyChecker.canBorrow(size)) { + return 0; + } + long acquired = acquireMemory(size); recorder.inc(acquired); + + if (this.policyChecker != null) { + this.policyChecker.borrow(acquired); + } + return acquired; } @@ -81,6 +94,11 @@ public long repay(long size) { freeMemory(toFree); Preconditions.checkArgument(getUsed() >= 0); recorder.inc(-toFree); + + if (this.policyChecker != null) { + this.policyChecker.repay(toFree); + } + return toFree; } diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java index 46257d80e7dd..1c1861f9b849 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java @@ -64,9 +64,14 @@ public static class Factory { private static final ReferenceMap MAP = new ReferenceMap(ReferenceMap.WEAK, ReferenceMap.WEAK); private final long perTaskCapacity; + private final DynamicOffHeapSizingPolicyChecker dynamicOffHeapSizingPolicyChecker; private Factory(long perTaskCapacity) { this.perTaskCapacity = perTaskCapacity; + this.dynamicOffHeapSizingPolicyChecker = + GlutenConfig.getConf().dynamicOffHeapSizingEnabled() + ? new DynamicOffHeapSizingPolicyChecker() + : null; } @SuppressWarnings("unchecked") @@ -76,7 +81,9 @@ private TreeMemoryTarget getSharedAccount(TaskMemoryManager tmm) { MAP.computeIfAbsent( tmm, m -> { - TreeMemoryTarget tmc = new TreeMemoryConsumer((TaskMemoryManager) m); + TreeMemoryTarget tmc = + new TreeMemoryConsumer( + (TaskMemoryManager) m, this.dynamicOffHeapSizingPolicyChecker); return tmc.newChild( "root", perTaskCapacity, Collections.emptyList(), Collections.emptyMap()); }); diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index 972b6ddbaeca..7becb583c4be 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -146,9 +146,10 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { // check memory off-heap enabled and size val minOffHeapSize = "1MB" if ( - !conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) || - conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes( - minOffHeapSize) + !conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false) && + (!conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) || + conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes( + minOffHeapSize)) ) { throw new GlutenException( s"Must set '${GlutenConfig.GLUTEN_OFFHEAP_ENABLED}' to true " + @@ -162,13 +163,51 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { // task slots val taskSlots = SparkResourceUtil.getTaskSlots(conf) - // Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution memory - // pool, regardless of Spark option spark.memory.storageFraction. - val offHeapSize = conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY) + var onHeapSize: Long = + if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) { + + conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY) + } else { + // 1GB default + 1024 * 1024 * 1024 + } + + conf.set(GlutenConfig.GLUTEN_ONHEAP_SIZE_IN_BYTES_KEY, onHeapSize.toString) + + // If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap + // size. Otherwise, the off-heap size is set to the value specified by the user (if any). + // Note that this means that we will IGNORE the off-heap size specified by the user if the + // dynamic off-heap feature is enabled. + var offHeapSize: Long = + if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) { + // Since when dynamic off-heap sizing is enabled, we commingle on-heap + // and off-heap memory, we set the off-heap size to the on-heap size. + onHeapSize + } else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) { + // Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution + // memory pool, regardless of Spark option spark.memory.storageFraction. + conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY) + } else { + // Default Spark Value. + 0L + } + conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString) + conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, offHeapSize.toString) + val offHeapPerTask = offHeapSize / taskSlots conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString) + // If we are using dynamic off-heap sizing, we should also enable off-heap memory + // officially. + if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) { + conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true") + } else { + // Let's make sure this is set to false explicitly if it is not on as it + // is looked up when throwing OOF exceptions. + conf.set(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, "false") + } + // Pessimistic off-heap sizes, with the assumption that all non-borrowable storage memory // determined by spark.memory.storageFraction was used. val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d) diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 60ff95a7ebb4..945ab9b8408c 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -378,6 +378,11 @@ class GlutenConfig(conf: SQLConf) extends Logging { def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL) def enableCastAvgAggregateFunction: Boolean = conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED) + + def onHeapMemorySize: Long = conf.getConf(COLUMNAR_ONHEAP_SIZE_IN_BYTES) + + def dynamicOffHeapSizingEnabled: Boolean = + conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED) } object GlutenConfig { @@ -449,6 +454,7 @@ object GlutenConfig { val GLUTEN_CONFIG_PREFIX = "spark.gluten.sql.columnar.backend." // Private Spark configs. + val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory" val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size" val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled" @@ -480,6 +486,7 @@ object GlutenConfig { val GLUTEN_DEBUG_KEEP_JNI_WORKSPACE = "spark.gluten.sql.debug.keepJniWorkspace" // Added back to Spark Conf during executor initialization + val GLUTEN_ONHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.onHeap.size.in.bytes" val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.offHeap.size.in.bytes" val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.task.offHeap.size.in.bytes" val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = @@ -524,6 +531,8 @@ object GlutenConfig { val GLUTEN_UI_ENABLED = "spark.gluten.ui.enabled" + val GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED = "spark.gluten.memory.dynamic.offHeap.sizing.enabled" + var ins: GlutenConfig = _ def getConf: GlutenConfig = { @@ -1758,4 +1767,21 @@ object GlutenConfig { .internal() .booleanConf .createWithDefault(true) + + val DYNAMIC_OFFHEAP_SIZING_ENABLED = + buildConf(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED) + .internal() + .doc("Enable using free on-heap memory as off-heap memory.") + .booleanConf + .createWithDefault(false) + + val COLUMNAR_ONHEAP_SIZE_IN_BYTES = + buildConf(GlutenConfig.GLUTEN_ONHEAP_SIZE_IN_BYTES_KEY) + .internal() + .doc( + "Must provide default value since non-execution operations " + + "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using " + + "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("0") } From 8c7cfa59bf9f8c015e16c225d3d14c7801a977dd Mon Sep 17 00:00:00 2001 From: Marcus Markiewicz Date: Mon, 6 May 2024 15:08:09 -0400 Subject: [PATCH 2/4] Address PR feedback: * Make it clear the proposed feature is experimental; * Document the caveat of possible memory over-subscription; * Use a new MemoryTarget class instead of changing TreeMemoryConsumer; * Take into considering Spark's approach of sizing on-heap with a reserved 300MB and memory fraction; --- ... => DynamicOffHeapSizingMemoryTarget.java} | 60 +++++++++++++------ .../memory/memtarget/MemoryTargetVisitor.java | 2 + .../memory/memtarget/MemoryTargets.java | 9 ++- .../memtarget/spark/TreeMemoryConsumer.java | 20 +------ .../memtarget/spark/TreeMemoryConsumers.java | 9 +-- .../org/apache/gluten/GlutenPlugin.scala | 35 +++++++---- .../apache/spark/memory/SparkMemoryUtil.scala | 7 ++- .../org/apache/gluten/GlutenConfig.scala | 12 +++- 8 files changed, 95 insertions(+), 59 deletions(-) rename gluten-core/src/main/java/org/apache/gluten/memory/memtarget/{spark/DynamicOffHeapSizingPolicyChecker.java => DynamicOffHeapSizingMemoryTarget.java} (52%) diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/DynamicOffHeapSizingPolicyChecker.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java similarity index 52% rename from gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/DynamicOffHeapSizingPolicyChecker.java rename to gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java index ef9f10e5bd91..854a3a577435 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/DynamicOffHeapSizingPolicyChecker.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.memory.memtarget.spark; +package org.apache.gluten.memory.memtarget; import org.apache.gluten.GlutenConfig; @@ -23,25 +23,30 @@ import java.util.concurrent.atomic.AtomicLong; -public final class DynamicOffHeapSizingPolicyChecker { - private static final Logger LOG = - LoggerFactory.getLogger(DynamicOffHeapSizingPolicyChecker.class); - private final long maxOnHeapMemoryInBytes = GlutenConfig.getConf().onHeapMemorySize(); - private final AtomicLong usedOffHeapBytes = new AtomicLong(); +public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget { + private static final Logger LOG = LoggerFactory.getLogger(DynamicOffHeapSizingMemoryTarget.class); + private final MemoryTarget delegated; + // When dynamic off-heap sizing is enabled, the off-heap should be sized for the total usable + // memory, so we can use it as the max memory we will use. + private static final long MAX_MEMORY_IN_BYTES = GlutenConfig.getConf().offHeapMemorySize(); + private static final AtomicLong USED_OFFHEAP_BYTES = new AtomicLong(); - DynamicOffHeapSizingPolicyChecker() {} + public DynamicOffHeapSizingMemoryTarget(MemoryTarget delegated) { + this.delegated = delegated; + } - public boolean canBorrow(long size) { + @Override + public long borrow(long size) { if (size == 0) { - return true; + return 0; } long totalMemory = Runtime.getRuntime().totalMemory(); long freeMemory = Runtime.getRuntime().freeMemory(); long usedOnHeapBytes = (totalMemory - freeMemory); - long usedOffHeapBytesNow = this.usedOffHeapBytes.get(); + long usedOffHeapBytesNow = USED_OFFHEAP_BYTES.get(); - if (size + usedOffHeapBytesNow + usedOnHeapBytes > maxOnHeapMemoryInBytes) { + if (size + usedOffHeapBytesNow + usedOnHeapBytes > MAX_MEMORY_IN_BYTES) { LOG.warn( String.format( "Failing allocation as unified memory is OOM. " @@ -52,20 +57,39 @@ public boolean canBorrow(long size) { usedOnHeapBytes, freeMemory, totalMemory, - maxOnHeapMemoryInBytes, + MAX_MEMORY_IN_BYTES, size)); - return false; + return 0; } - return true; + long reserved = delegated.borrow(size); + + USED_OFFHEAP_BYTES.addAndGet(reserved); + + return reserved; + } + + @Override + public long repay(long size) { + long unreserved = delegated.repay(size); + + USED_OFFHEAP_BYTES.addAndGet(-unreserved); + + return unreserved; + } + + @Override + public long usedBytes() { + return delegated.usedBytes(); } - public void borrow(long size) { - usedOffHeapBytes.addAndGet(size); + @Override + public T accept(MemoryTargetVisitor visitor) { + return visitor.visit(this); } - public void repay(long size) { - usedOffHeapBytes.addAndGet(-size); + public MemoryTarget delegated() { + return delegated; } } diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java index caff2605d923..e58dbb295b08 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java @@ -33,4 +33,6 @@ public interface MemoryTargetVisitor { T visit(LoggingMemoryTarget loggingMemoryTarget); T visit(NoopMemoryTarget noopMemoryTarget); + + T visit(DynamicOffHeapSizingMemoryTarget dynamicOffHeapSizingMemoryTarget); } diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java index ccb4beee8475..ec68f1469280 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java @@ -54,6 +54,13 @@ public static MemoryTarget newConsumer( } else { factory = TreeMemoryConsumers.shared(); } - return factory.newConsumer(tmm, name, spillers, virtualChildren); + + final MemoryTarget memoryTarget = factory.newConsumer(tmm, name, spillers, virtualChildren); + + if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) { + return new DynamicOffHeapSizingMemoryTarget(memoryTarget); + } + + return memoryTarget; } } diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java index 4f991275ecda..f86295b0697f 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumer.java @@ -56,12 +56,9 @@ public class TreeMemoryConsumer extends MemoryConsumer implements TreeMemoryTarg private final SimpleMemoryUsageRecorder recorder = new SimpleMemoryUsageRecorder(); private final Map children = new HashMap<>(); private final String name = MemoryTargetUtil.toUniqueName("Gluten.Tree"); - private final DynamicOffHeapSizingPolicyChecker policyChecker; - TreeMemoryConsumer( - TaskMemoryManager taskMemoryManager, DynamicOffHeapSizingPolicyChecker policyChecker) { + TreeMemoryConsumer(TaskMemoryManager taskMemoryManager) { super(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.OFF_HEAP); - this.policyChecker = policyChecker; } @Override @@ -70,18 +67,8 @@ public long borrow(long size) { // or Spark complains about the zero size by throwing an error return 0; } - - if (this.policyChecker != null && !this.policyChecker.canBorrow(size)) { - return 0; - } - long acquired = acquireMemory(size); recorder.inc(acquired); - - if (this.policyChecker != null) { - this.policyChecker.borrow(acquired); - } - return acquired; } @@ -94,11 +81,6 @@ public long repay(long size) { freeMemory(toFree); Preconditions.checkArgument(getUsed() >= 0); recorder.inc(-toFree); - - if (this.policyChecker != null) { - this.policyChecker.repay(toFree); - } - return toFree; } diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java index 1c1861f9b849..46257d80e7dd 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java @@ -64,14 +64,9 @@ public static class Factory { private static final ReferenceMap MAP = new ReferenceMap(ReferenceMap.WEAK, ReferenceMap.WEAK); private final long perTaskCapacity; - private final DynamicOffHeapSizingPolicyChecker dynamicOffHeapSizingPolicyChecker; private Factory(long perTaskCapacity) { this.perTaskCapacity = perTaskCapacity; - this.dynamicOffHeapSizingPolicyChecker = - GlutenConfig.getConf().dynamicOffHeapSizingEnabled() - ? new DynamicOffHeapSizingPolicyChecker() - : null; } @SuppressWarnings("unchecked") @@ -81,9 +76,7 @@ private TreeMemoryTarget getSharedAccount(TaskMemoryManager tmm) { MAP.computeIfAbsent( tmm, m -> { - TreeMemoryTarget tmc = - new TreeMemoryConsumer( - (TaskMemoryManager) m, this.dynamicOffHeapSizingPolicyChecker); + TreeMemoryTarget tmc = new TreeMemoryConsumer((TaskMemoryManager) m); return tmc.newChild( "root", perTaskCapacity, Collections.emptyList(), Collections.emptyMap()); }); diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index 69235fa58f2b..7a0f2a63c760 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -164,7 +164,6 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { var onHeapSize: Long = if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) { - conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY) } else { // 1GB default @@ -180,8 +179,16 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { var offHeapSize: Long = if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) { // Since when dynamic off-heap sizing is enabled, we commingle on-heap - // and off-heap memory, we set the off-heap size to the on-heap size. - onHeapSize + // and off-heap memory, we set the off-heap size to the usable on-heap size. We will + // size it using the same way that Spark sizes on-heap memory: + // + // spark.memory.fraction * (spark.executor.memory - 300MB). + // + // We will be careful to use the same configuration settings as Spark to ensure + // that we are sizing the off-heap memory in the same way as Spark sizes on-heap memory. + // The 300MB value, unfortunately, is hard-coded in Spark code. + ((onHeapSize - (300 * 1024 * 1024)) * + conf.getDouble("spark.memory.memoryFraction", 0.6d)).toLong } else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) { // Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution // memory pool, regardless of Spark option spark.memory.storageFraction. @@ -201,19 +208,25 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { // officially. if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) { conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true") + + // We already sized the off-heap per task in a conservative manner, so we can just + // use it. + conf.set( + GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, + offHeapPerTask.toString) } else { // Let's make sure this is set to false explicitly if it is not on as it // is looked up when throwing OOF exceptions. conf.set(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, "false") - } - // Pessimistic off-heap sizes, with the assumption that all non-borrowable storage memory - // determined by spark.memory.storageFraction was used. - val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d) - val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong / taskSlots - conf.set( - GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, - conservativeOffHeapPerTask.toString) + // Pessimistic off-heap sizes, with the assumption that all non-borrowable storage memory + // determined by spark.memory.storageFraction was used. + val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d) + val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong / taskSlots + conf.set( + GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, + conservativeOffHeapPerTask.toString) + } // disable vanilla columnar readers, to prevent columnar-to-columnar conversions if (BackendsApiManager.getSettings.disableVanillaColumnarReaders(conf)) { diff --git a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala index 8bf88ef7d15c..48ed08fb71ce 100644 --- a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.memory -import org.apache.gluten.memory.memtarget.{KnownNameAndStats, LoggingMemoryTarget, MemoryTarget, MemoryTargetVisitor, NoopMemoryTarget, OverAcquire, ThrowOnOomMemoryTarget, TreeMemoryTargets} +import org.apache.gluten.memory.memtarget.{DynamicOffHeapSizingMemoryTarget, KnownNameAndStats, LoggingMemoryTarget, MemoryTarget, MemoryTargetVisitor, NoopMemoryTarget, OverAcquire, ThrowOnOomMemoryTarget, TreeMemoryTargets} import org.apache.gluten.memory.memtarget.spark.{RegularMemoryConsumer, TreeMemoryConsumer} import org.apache.gluten.proto.MemoryUsageStats @@ -117,6 +117,11 @@ object SparkMemoryUtil { override def visit(noopMemoryTarget: NoopMemoryTarget): KnownNameAndStats = { noopMemoryTarget } + + override def visit(dynamicOffHeapSizingMemoryTarget: DynamicOffHeapSizingMemoryTarget) + : KnownNameAndStats = { + dynamicOffHeapSizingMemoryTarget.delegated().accept(this) + } }) } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index aa53f77db74d..81a0b857cdec 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -1834,7 +1834,17 @@ object GlutenConfig { val DYNAMIC_OFFHEAP_SIZING_ENABLED = buildConf(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED) .internal() - .doc("Enable using free on-heap memory as off-heap memory.") + .doc( + "Experimental: When set to true, the offheap config (spark.memory.offHeap.size) will" + + "be ignored and instead we will consider onheap and offheap memory in combination," + + "both counting towards the executor memory config (spark.executor.memory). We will" + + "make use of JVM APIs to determine how much onheap memory is use, alongside tracking" + + "offheap allocations made by Gluten. We will then proceed to enforcing a total memory" + + "quota, calculated by the sum of what memory is committed and in use in the Java heap." + + "Since the calculation of the total quota happens as offheap allocation happens and not" + + "as JVM heap memory is allocates, it is possible that we can oversubscribe memory." + + "Additionally, note that this change is experimental and may have performance " + + "implications.") .booleanConf .createWithDefault(false) From 844586332533d36dbce846906b62a9ca3398ccb1 Mon Sep 17 00:00:00 2001 From: Marcus Markiewicz Date: Wed, 8 May 2024 11:02:32 -0400 Subject: [PATCH 3/4] Address PR feedback * Added function as asked; * After further experimentation, decided to introduce an (optional) aggressive config that allows for changing the memory fraction used in the total memory calculation, but without changing Spark's memory fraction. This allows for fine-tuning / more aggressive usage if so desired; * Fixed typos and missing spaces in some of the exception and feature doc text; * Removed unnecessary test settings that do not impact the feature to speed things up a bit; --- .../execution/DynamicOffHeapSizingTest.scala | 9 ++---- .../DynamicOffHeapSizingMemoryTarget.java | 4 +-- .../memory/memtarget/MemoryTargets.java | 16 +++++----- .../org/apache/gluten/GlutenPlugin.scala | 8 +++-- .../org/apache/gluten/GlutenConfig.scala | 31 +++++++++++++------ 5 files changed, 41 insertions(+), 27 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala index c70ec73fdf78..56fc6eac3e11 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala @@ -39,11 +39,9 @@ class DynamicOffHeapSizingTest extends VeloxWholeStageTransformerSuite { override protected def sparkConf: SparkConf = { super.sparkConf .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") - .set("spark.executor.memory", "5GB") + .set("spark.executor.memory", "6GB") + .set("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction", "0.8") .set("spark.gluten.memory.dynamic.offHeap.sizing.enabled", "true") - .set("spark.driver.maxResultSize", "50GB") - .set("spark.gluten.sql.debug", "true") - .set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0") } def getRootCause(e: Throwable): Throwable = { @@ -57,7 +55,6 @@ class DynamicOffHeapSizingTest extends VeloxWholeStageTransformerSuite { System.gc() dataGenerator.generateRandomData(spark, Some(outputPath)) spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl") - val df = spark.sql(AGG_SQL) - df + spark.sql(AGG_SQL) } } diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java index 854a3a577435..b7f15d830bed 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java @@ -50,8 +50,8 @@ public long borrow(long size) { LOG.warn( String.format( "Failing allocation as unified memory is OOM. " - + "Used Off-heap: %d, Used On-Heap: %d," - + "Free On-heap: %d, Total On-heap: %d," + + "Used Off-heap: %d, Used On-Heap: %d, " + + "Free On-heap: %d, Total On-heap: %d, " + "Max On-heap: %d, Allocation: %d.", usedOffHeapBytesNow, usedOnHeapBytes, diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java index ec68f1469280..2d6fc0748464 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java @@ -43,6 +43,14 @@ public static MemoryTarget overAcquire( return new OverAcquire(target, overTarget, overAcquiredRatio); } + public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget memoryTarget) { + if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) { + return new DynamicOffHeapSizingMemoryTarget(memoryTarget); + } + + return memoryTarget; + } + public static MemoryTarget newConsumer( TaskMemoryManager tmm, String name, @@ -55,12 +63,6 @@ public static MemoryTarget newConsumer( factory = TreeMemoryConsumers.shared(); } - final MemoryTarget memoryTarget = factory.newConsumer(tmm, name, spillers, virtualChildren); - - if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) { - return new DynamicOffHeapSizingMemoryTarget(memoryTarget); - } - - return memoryTarget; + return dynamicOffHeapSizingIfEnabled(factory.newConsumer(tmm, name, spillers, virtualChildren)); } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index 7a0f2a63c760..acb839ddecb4 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -180,15 +180,17 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) { // Since when dynamic off-heap sizing is enabled, we commingle on-heap // and off-heap memory, we set the off-heap size to the usable on-heap size. We will - // size it using the same way that Spark sizes on-heap memory: + // size it with a memory fraction, which can be aggressively set, but the default + // is using the same way that Spark sizes on-heap memory: // - // spark.memory.fraction * (spark.executor.memory - 300MB). + // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction * + // (spark.executor.memory - 300MB). // // We will be careful to use the same configuration settings as Spark to ensure // that we are sizing the off-heap memory in the same way as Spark sizes on-heap memory. // The 300MB value, unfortunately, is hard-coded in Spark code. ((onHeapSize - (300 * 1024 * 1024)) * - conf.getDouble("spark.memory.memoryFraction", 0.6d)).toLong + conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 0.6d)).toLong } else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) { // Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution // memory pool, regardless of Spark option spark.memory.storageFraction. diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 81a0b857cdec..6297e9512f8e 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -549,6 +549,8 @@ object GlutenConfig { val GLUTEN_UI_ENABLED = "spark.gluten.ui.enabled" val GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED = "spark.gluten.memory.dynamic.offHeap.sizing.enabled" + val GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION = + "spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction" var ins: GlutenConfig = _ @@ -1835,19 +1837,30 @@ object GlutenConfig { buildConf(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED) .internal() .doc( - "Experimental: When set to true, the offheap config (spark.memory.offHeap.size) will" + - "be ignored and instead we will consider onheap and offheap memory in combination," + - "both counting towards the executor memory config (spark.executor.memory). We will" + - "make use of JVM APIs to determine how much onheap memory is use, alongside tracking" + - "offheap allocations made by Gluten. We will then proceed to enforcing a total memory" + - "quota, calculated by the sum of what memory is committed and in use in the Java heap." + - "Since the calculation of the total quota happens as offheap allocation happens and not" + - "as JVM heap memory is allocates, it is possible that we can oversubscribe memory." + - "Additionally, note that this change is experimental and may have performance " + + "Experimental: When set to true, the offheap config (spark.memory.offHeap.size) will " + + "be ignored and instead we will consider onheap and offheap memory in combination, " + + "both counting towards the executor memory config (spark.executor.memory). We will " + + "make use of JVM APIs to determine how much onheap memory is use, alongside tracking " + + "offheap allocations made by Gluten. We will then proceed to enforcing a total memory " + + "quota, calculated by the sum of what memory is committed and in use in the Java " + + "heap. Since the calculation of the total quota happens as offheap allocation happens " + + "and not as JVM heap memory is allocated, it is possible that we can oversubscribe " + + "memory. Additionally, note that this change is experimental and may have performance " + "implications.") .booleanConf .createWithDefault(false) + val DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION = + buildConf(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION) + .internal() + .doc( + "Experimental: Determines the memory fraction used to determine the total " + + "memory available for offheap and onheap allocations when the dynamic offheap " + + "sizing feature is enabled. The default is set to match spark.executor.memoryFraction.") + .doubleConf + .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must between [0, 1]") + .createWithDefault(0.6) + val COLUMNAR_ONHEAP_SIZE_IN_BYTES = buildConf(GlutenConfig.GLUTEN_ONHEAP_SIZE_IN_BYTES_KEY) .internal() From 768c1866497193ff3377679a3f3d96ab71364e0a Mon Sep 17 00:00:00 2001 From: Marcus Markiewicz Date: Wed, 15 May 2024 09:45:28 -0400 Subject: [PATCH 4/4] Address PR feedback * Remove unused config --- .../main/scala/org/apache/gluten/GlutenPlugin.scala | 2 -- .../main/scala/org/apache/gluten/GlutenConfig.scala | 13 ------------- 2 files changed, 15 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index acb839ddecb4..cbbbd770cea6 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -170,8 +170,6 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { 1024 * 1024 * 1024 } - conf.set(GlutenConfig.GLUTEN_ONHEAP_SIZE_IN_BYTES_KEY, onHeapSize.toString) - // If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap // size. Otherwise, the off-heap size is set to the value specified by the user (if any). // Note that this means that we will IGNORE the off-heap size specified by the user if the diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 6297e9512f8e..4b120498d09d 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -394,8 +394,6 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableCastAvgAggregateFunction: Boolean = conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED) - def onHeapMemorySize: Long = conf.getConf(COLUMNAR_ONHEAP_SIZE_IN_BYTES) - def dynamicOffHeapSizingEnabled: Boolean = conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED) } @@ -501,7 +499,6 @@ object GlutenConfig { val GLUTEN_DEBUG_KEEP_JNI_WORKSPACE = "spark.gluten.sql.debug.keepJniWorkspace" // Added back to Spark Conf during executor initialization - val GLUTEN_ONHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.onHeap.size.in.bytes" val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.offHeap.size.in.bytes" val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.task.offHeap.size.in.bytes" val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = @@ -1860,14 +1857,4 @@ object GlutenConfig { .doubleConf .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must between [0, 1]") .createWithDefault(0.6) - - val COLUMNAR_ONHEAP_SIZE_IN_BYTES = - buildConf(GlutenConfig.GLUTEN_ONHEAP_SIZE_IN_BYTES_KEY) - .internal() - .doc( - "Must provide default value since non-execution operations " + - "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using " + - "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated") - .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("0") }