From 185fa65f5b8ae84b6b8a7dbf1ba216994e55b049 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 23 Aug 2024 11:04:11 +0800 Subject: [PATCH 1/7] WIP --- .../memtarget/ThrowOnOomMemoryTarget.java | 4 ++-- .../org/apache/gluten/GlutenPlugin.scala | 23 +++++++++++-------- .../apache/spark/util/SparkResourceUtil.scala | 12 ++++++++++ .../org/apache/gluten/GlutenConfig.scala | 22 ++++++++++++++---- 4 files changed, 45 insertions(+), 16 deletions(-) diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java index e6b6ba07eb6b..74e0cbb8779b 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java @@ -89,8 +89,8 @@ public long borrow(long size) { .append( String.format( "\t%s=%s", - GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED(), - SQLConf.get().getConfString(GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED()))) + GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED(), + SQLConf.get().getConfString(GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED()))) .append(System.lineSeparator()) .append( String.format( diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index 6e3484dfa969..5d25de3aeb0f 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -156,13 +156,13 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { val minOffHeapSize = "1MB" if ( !conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false) && - (!conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) || - conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes( + (!conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, false) || + conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes( minOffHeapSize)) ) { throw new GlutenException( - s"Must set '${GlutenConfig.GLUTEN_OFFHEAP_ENABLED}' to true " + - s"and set '${GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize") + s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " + + s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize") } // Session's local time zone must be set. If not explicitly set by user, its default @@ -174,13 +174,16 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString) val onHeapSize: Long = - if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) { - conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY) + if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) { + conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY) } else { // 1GB default 1024 * 1024 * 1024 } + val overheadSize : Long = SparkResourceUtil.getMemoryOverheadSize(conf) + conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString) + // If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap // size. Otherwise, the off-heap size is set to the value specified by the user (if any). // Note that this means that we will IGNORE the off-heap size specified by the user if the @@ -200,17 +203,17 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { // The 300MB value, unfortunately, is hard-coded in Spark code. ((onHeapSize - (300 * 1024 * 1024)) * conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 0.6d)).toLong - } else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) { + } else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) { // Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution // memory pool, regardless of Spark option spark.memory.storageFraction. - conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY) + conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY) } else { // Default Spark Value. 0L } conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString) - conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, offHeapSize.toString) + conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString) val offHeapPerTask = offHeapSize / taskSlots conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString) @@ -218,7 +221,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { // If we are using dynamic off-heap sizing, we should also enable off-heap memory // officially. if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) { - conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true") + conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true") // We already sized the off-heap per task in a conservative manner, so we can just // use it. diff --git a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala index f8c791fe1374..faadf3a27cb3 100644 --- a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala @@ -18,6 +18,8 @@ package org.apache.spark.util import org.apache.spark.{SparkConf, SparkMasterRegex} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} +import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.internal.SQLConf object SparkResourceUtil extends Logging { @@ -80,4 +82,14 @@ object SparkResourceUtil extends Logging { def isLocalMaster(conf: SparkConf): Boolean = { Utils.isLocalMaster(conf) } + + def getMemoryOverheadSize(conf: SparkConf): Long = { + conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse { + val executorMemMib = conf.get(EXECUTOR_MEMORY) + val factor = + conf.getDouble("spark.executor.memoryOverheadFactor", 0.1D) + val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L) + ByteUnit.MiB.toBytes((executorMemMib * factor).toLong max minMib) + } + } } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index fa78060dad6c..c53f2f9f5b72 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -535,9 +535,11 @@ object GlutenConfig { val GLUTEN_CONFIG_PREFIX = "spark.gluten.sql.columnar.backend." // Private Spark configs. - val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory" - val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size" - val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled" + val SPARK_ONHEAP_SIZE_KEY = "spark.executor.memory" + val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead" + val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor" + val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size" + val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled" val SPARK_REDACTION_REGEX = "spark.redaction.regex" // For Soft Affinity Scheduling @@ -570,6 +572,7 @@ object GlutenConfig { // Added back to Spark Conf during executor initialization val GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY = "spark.gluten.numTaskSlotsPerExecutor" + val GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY = "spark.gluten.memoryOverhead.size.in.bytes" val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.offHeap.size.in.bytes" val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.task.offHeap.size.in.bytes" val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = @@ -762,9 +765,10 @@ object GlutenConfig { SPARK_SQL_PARQUET_COMPRESSION_CODEC, // datasource config end + GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, - GLUTEN_OFFHEAP_ENABLED, + SPARK_OFFHEAP_ENABLED, SESSION_LOCAL_TIMEZONE.key, DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, SPARK_REDACTION_REGEX @@ -1244,6 +1248,16 @@ object GlutenConfig { .intConf .createWithDefaultString("-1") + val COLUMNAR_OVERHEAD_SIZE_IN_BYTES = + buildConf(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY) + .internal() + .doc( + "Must provide default value since non-execution operations " + + "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using " + + "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("0") + val COLUMNAR_OFFHEAP_SIZE_IN_BYTES = buildConf(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY) .internal() From c924c220e2a6c39b1d2dc04d61779a758ca5cb6b Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 23 Aug 2024 11:21:40 +0800 Subject: [PATCH 2/7] fixup --- cpp/core/config/GlutenConfig.h | 2 ++ .../main/scala/org/apache/spark/util/SparkResourceUtil.scala | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 31318ff0aa0c..cd5196aa8c13 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -38,6 +38,8 @@ const std::string kIgnoreMissingFiles = "spark.sql.files.ignoreMissingFiles"; const std::string kDefaultSessionTimezone = "spark.gluten.sql.session.timeZone.default"; +const std::string kSparkOverheadMemory = "spark.gluten.memoryOverhead.size.in.bytes"; + const std::string kSparkOffHeapMemory = "spark.gluten.memory.offHeap.size.in.bytes"; const std::string kSparkTaskOffHeapMemory = "spark.gluten.memory.task.offHeap.size.in.bytes"; diff --git a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala index faadf3a27cb3..d298ab481a0c 100644 --- a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala @@ -84,12 +84,13 @@ object SparkResourceUtil extends Logging { } def getMemoryOverheadSize(conf: SparkConf): Long = { - conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse { + val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse { val executorMemMib = conf.get(EXECUTOR_MEMORY) val factor = conf.getDouble("spark.executor.memoryOverheadFactor", 0.1D) val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L) - ByteUnit.MiB.toBytes((executorMemMib * factor).toLong max minMib) + (executorMemMib * factor).toLong max minMib } + ByteUnit.MiB.toBytes(overheadMib) } } From 6de2d4b4d995f19804515c82af41412a7e5aaf9a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 23 Aug 2024 11:34:55 +0800 Subject: [PATCH 3/7] fixup --- cpp/velox/compute/VeloxBackend.cc | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 8dc3ade80dec..dcfadc42c0c9 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -126,8 +126,18 @@ void VeloxBackend::init(const std::unordered_map& conf initUdf(); registerSparkTokenizer(); - // initialize the global memory manager for current process - facebook::velox::memory::MemoryManager::initialize({}); + // Initialize the global memory manager for current process. + auto sparkOverhead = backendConf_->get(kSparkOverheadMemory); + int64_t memoryManagerCapacity; + if (sparkOverhead.hasValue()) { + // 0.75 * total overhead memory is used for Velox global memory manager. + // FIXME: Make this configurable. + memoryManagerCapacity = sparkOverhead.value() * 0.75; + } else { + memoryManagerCapacity = facebook::velox::memory::kMaxMemory; + } + LOG(INFO) << "Setting global Velox memory manager with capacity: " << memoryManagerCapacity; + facebook::velox::memory::MemoryManager::initialize({.allocatorCapacity = memoryManagerCapacity}); } facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const { From 821a5145161c6d6b08bda293efac70bae8721019 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 23 Aug 2024 11:50:01 +0800 Subject: [PATCH 4/7] fixup --- .../scala/org/apache/gluten/execution/VeloxTPCHSuite.scala | 1 + .../src/main/scala/org/apache/gluten/GlutenPlugin.scala | 2 +- .../main/scala/org/apache/spark/util/SparkResourceUtil.scala | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala index 0e94c242c1db..b87f8c05eaa0 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala @@ -48,6 +48,7 @@ abstract class VeloxTPCHTableSupport extends VeloxWholeStageTransformerSuite { .set("spark.memory.offHeap.size", "2g") .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0") // TODO Should enable this after fix the issue of native plan detail occasional disappearance // .set("spark.gluten.sql.injectNativePlanStringToExplain", "true") } diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index 5d25de3aeb0f..5e0c1758d0b4 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -181,7 +181,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { 1024 * 1024 * 1024 } - val overheadSize : Long = SparkResourceUtil.getMemoryOverheadSize(conf) + val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf) conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString) // If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap diff --git a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala index d298ab481a0c..890ea31b6f1b 100644 --- a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala @@ -87,9 +87,9 @@ object SparkResourceUtil extends Logging { val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse { val executorMemMib = conf.get(EXECUTOR_MEMORY) val factor = - conf.getDouble("spark.executor.memoryOverheadFactor", 0.1D) + conf.getDouble("spark.executor.memoryOverheadFactor", 0.1d) val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L) - (executorMemMib * factor).toLong max minMib + (executorMemMib * factor).toLong.max(minMib) } ByteUnit.MiB.toBytes(overheadMib) } From 8293418e7ef47422b34b9605629ec55bcb6ed766 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 23 Aug 2024 16:16:41 +0800 Subject: [PATCH 5/7] fixup --- .../test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala index b87f8c05eaa0..0e94c242c1db 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala @@ -48,7 +48,6 @@ abstract class VeloxTPCHTableSupport extends VeloxWholeStageTransformerSuite { .set("spark.memory.offHeap.size", "2g") .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set("spark.sql.autoBroadcastJoinThreshold", "-1") - .set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0") // TODO Should enable this after fix the issue of native plan detail occasional disappearance // .set("spark.gluten.sql.injectNativePlanStringToExplain", "true") } From e24d0b3744dc8eb81df74ba5c7c71aa6a649ee7a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 26 Aug 2024 10:24:00 +0800 Subject: [PATCH 6/7] fixup --- .../src/main/scala/org/apache/gluten/GlutenPlugin.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index 5e0c1758d0b4..f120af0ef4eb 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -184,6 +184,12 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf) conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString) + conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, Long.MaxValue.toString) + logWarning( + "Setting overhead memory that Gluten can use to UNLIMITED. This is currently a" + + " temporary solution to avoid OOM by Velox's global memory pools." + + " See GLUTEN-6960 for more information.") + // If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap // size. Otherwise, the off-heap size is set to the value specified by the user (if any). // Note that this means that we will IGNORE the off-heap size specified by the user if the From 66dc47f8a352b41499a7e25e2c4c115502324c51 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 26 Aug 2024 10:27:18 +0800 Subject: [PATCH 7/7] fixup --- gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index f120af0ef4eb..f775d78a15ac 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -184,6 +184,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf) conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString) + // FIXME: The following is a workaround. Remove once the causes are fixed. conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, Long.MaxValue.toString) logWarning( "Setting overhead memory that Gluten can use to UNLIMITED. This is currently a" +