Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.gluten.vectorized.{BlockOutputStream, CHColumnarBatchSerialize

import org.apache.spark.ShuffleDependency
import org.apache.spark.internal.Logging
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
Expand All @@ -49,7 +50,7 @@ import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildSideRelation, ClickHouseBuildSideRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.utils.{CHExecUtil, PushDownUtil}
Expand Down Expand Up @@ -559,9 +560,11 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
val batches = countsAndBytes.map(_._2)
val totalBatchesSize = batches.map(_.length).sum
val rawSize = dataSize.value
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
if (rawSize >= GlutenConfig.get.maxBroadcastTableSize) {
throw new GlutenException(
s"Cannot broadcast the table that is larger than 8GB: $rawSize bytes")
"Cannot broadcast the table that is larger than " +
s"${SparkMemoryUtil.bytesToString(GlutenConfig.get.maxBroadcastTableSize)}: " +
s"${SparkMemoryUtil.bytesToString(rawSize)}")
}
if ((rawSize == 0 && totalBatchesSize != 0) || totalBatchesSize < 0) {
throw new GlutenException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.gluten.vectorized.{ColumnarBatchSerializer, ColumnarBatchSeria

import org.apache.spark.{ShuffleDependency, SparkException}
import org.apache.spark.api.python.{ColumnarArrowEvalPythonExec, PullOutArrowEvalPythonPreProjectHelper}
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper}
Expand All @@ -41,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
Expand Down Expand Up @@ -649,9 +650,11 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
.filter(_.getNumRows != 0)
.collect
val rawSize = serialized.map(_.getSerialized.length).sum
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
if (rawSize >= GlutenConfig.get.maxBroadcastTableSize) {
throw new SparkException(
s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30} GB")
"Cannot broadcast the table that is larger than " +
s"${SparkMemoryUtil.bytesToString(GlutenConfig.get.maxBroadcastTableSize)}: " +
s"${SparkMemoryUtil.bytesToString(rawSize)}")
}
numOutputRows += serialized.map(_.getNumRows).sum
dataSize += rawSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.config

import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf, SQLConfProvider}

import com.google.common.collect.ImmutableList
Expand Down Expand Up @@ -364,6 +364,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED)
def enableColumnarCollectLimit: Boolean = getConf(COLUMNAR_COLLECT_LIMIT_ENABLED)
def getSupportedFlattenedExpressions: String = getConf(GLUTEN_SUPPORTED_FLATTENED_FUNCTIONS)

def maxBroadcastTableSize: Long =
JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB"))
}

object GlutenConfig {
Expand Down Expand Up @@ -442,6 +445,7 @@ object GlutenConfig {
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE = "spark.shuffle.spill.diskWriteBufferSize"
val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true
val SPARK_MAX_BROADCAST_TABLE_SIZE = "spark.sql.maxBroadcastTableSize"

def get: GlutenConfig = {
new GlutenConfig(SQLConf.get)
Expand Down