From 2543455ad7ee356dc163bafb3e36805a4c1e9d80 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 19 Mar 2025 09:07:26 -0700 Subject: [PATCH 1/2] initial commit --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++++ .../sql/execution/exchange/BroadcastExchangeExec.scala | 8 +++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ca7d8ce037931..815d6d1841733 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1534,6 +1534,12 @@ object SQLConf { .timeConf(TimeUnit.SECONDS) .createWithDefaultString(s"${5 * 60}") + val MAX_BROADCAST_TABLE_SIZE = buildConf("spark.sql.maxBroadcastTableSize") + .doc("The maximum table size that can be broadcast in broadcast joins.") + .version("4.0.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(8L << 30) + val INTERRUPT_ON_CANCEL = buildConf("spark.sql.execution.interruptOnCancel") .doc("When true, all running tasks will be interrupted if one cancels a query.") .version("4.0.0") @@ -6155,6 +6161,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { if (timeoutValue < 0) Long.MaxValue else timeoutValue } + def maxBroadcastTableSizeInBytes: Long = getConf(MAX_BROADCAST_TABLE_SIZE) + def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) def convertCTAS: Boolean = getConf(CONVERT_CTAS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 2565a14cef90b..c70ee637a2489 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -125,7 +125,6 @@ trait BroadcastExchangeLike extends Exchange { case class BroadcastExchangeExec( mode: BroadcastMode, child: SparkPlan) extends BroadcastExchangeLike { - import BroadcastExchangeExec._ override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), @@ -203,9 +202,10 @@ case class BroadcastExchangeExec( } longMetric("dataSize") += dataSize - if (dataSize >= MAX_BROADCAST_TABLE_BYTES) { + val maxBroadcastTableSizeInBytes = conf.maxBroadcastTableSizeInBytes + if (dataSize >= maxBroadcastTableSizeInBytes) { throw QueryExecutionErrors.cannotBroadcastTableOverMaxTableBytesError( - MAX_BROADCAST_TABLE_BYTES, dataSize) + maxBroadcastTableSizeInBytes, dataSize) } val beforeBroadcast = System.nanoTime() @@ -268,8 +268,6 @@ case class BroadcastExchangeExec( } object BroadcastExchangeExec { - val MAX_BROADCAST_TABLE_BYTES = 8L << 30 - private[execution] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD))) From 4052912e113f3f85817b61ceb4141d43815f998d Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 19 Mar 2025 11:25:14 -0700 Subject: [PATCH 2/2] address comments --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 815d6d1841733..5e656b6a57c2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1535,8 +1535,8 @@ object SQLConf { .createWithDefaultString(s"${5 * 60}") val MAX_BROADCAST_TABLE_SIZE = buildConf("spark.sql.maxBroadcastTableSize") - .doc("The maximum table size that can be broadcast in broadcast joins.") - .version("4.0.0") + .doc("The maximum table size in bytes that can be broadcast in broadcast joins.") + .version("4.1.0") .bytesConf(ByteUnit.BYTE) .createWithDefault(8L << 30)