Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {

def enablePropagateIgnoreNullKeys: Boolean =
getConf(VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED)

def floatingPointMode: String = getConf(FLOATING_POINT_MODE)
}

object VeloxConfig {
Expand Down Expand Up @@ -307,6 +309,19 @@ object VeloxConfig {
.booleanConf
.createWithDefault(true)

val MAX_PARTIAL_AGGREGATION_MEMORY =
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory")
.internal()
.doc(
"Set the max memory of partial aggregation in bytes. When this option is set to a " +
"value greater than 0, it will override spark.gluten.sql.columnar.backend.velox." +
"maxPartialAggregationMemoryRatio. Note: this option only works when flushable " +
"partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend." +
"velox.flushablePartialAggregation=false."
)
.bytesConf(ByteUnit.BYTE)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be

.byteConf(...)
.createOptional

.createOptional

val MAX_PARTIAL_AGGREGATION_MEMORY_RATIO =
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio")
.internal()
Expand Down Expand Up @@ -532,4 +547,15 @@ object VeloxConfig {
"avoid unnecessary aggregation on null keys.")
.booleanConf
.createWithDefault(true)

val FLOATING_POINT_MODE =
buildConf("spark.gluten.sql.columnar.backend.velox.floatingPointMode")
.doc(
"Config used to control the tolerance of floating point operations alignment with Spark. " +
"When the mode is set to strict, flushing is disabled for sum(float/double)" +
"and avg(float/double). When set to loose, flushing will be enabled.")
.internal()
.stringConf
.checkValues(Set("loose", "strict"))
.createWithDefault("loose")
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.execution._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.aggregate.{Partial, PartialMerge}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.EXCHANGE
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.types.{DataType, DoubleType, FloatType}

/**
* To transform regular aggregation to intermediate aggregation that internally enables
Expand Down Expand Up @@ -61,6 +62,26 @@ case class FlushableHashAggregateRule(session: SparkSession) extends Rule[SparkP
}
}

private def aggregatesNotSupportFlush(aggExprs: Seq[AggregateExpression]): Boolean = {
if (VeloxConfig.get.floatingPointMode == "loose") {
return false
}

def isFloatingPointType(dataType: DataType): Boolean = {
dataType == DoubleType || dataType == FloatType
}

def isUnsupportedAggregation(aggExpr: AggregateExpression): Boolean = {
aggExpr.aggregateFunction match {
case Sum(child, _) if isFloatingPointType(child.dataType) => true
case Average(child, _) if isFloatingPointType(child.dataType) => true
case _ => false
}
}

aggExprs.exists(isUnsupportedAggregation)
}

private def replaceEligibleAggregates(plan: SparkPlan)(
func: RegularHashAggregateExecTransformer => SparkPlan): SparkPlan = {
def transformDown: SparkPlan => SparkPlan = {
Expand All @@ -72,6 +93,9 @@ case class FlushableHashAggregateRule(session: SparkSession) extends Rule[SparkP
if isAggInputAlreadyDistributedWithAggKeys(agg) =>
// Data already grouped by aggregate keys, Skip.
agg
case agg: RegularHashAggregateExecTransformer
if aggregatesNotSupportFlush(agg.aggregateExpressions) =>
agg
case agg: RegularHashAggregateExecTransformer =>
func(agg)
case p if !canPropagate(p) => p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,60 @@ class VeloxAggregateFunctionsFlushSuite extends VeloxAggregateFunctionsSuite {
}
}
}

test("flushable aggregate rule - double sum when floatingPointMode is strict") {
withSQLConf(
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory" -> "100",
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> "false",
"spark.gluten.sql.columnar.maxBatchSize" -> "2",
"spark.gluten.sql.columnar.backend.velox.floatingPointMode" -> "strict"
) {
withTempView("t1") {
import testImplicits._
Seq((24.621d, 1), (12.14d, 1), (0.169d, 1), (6.865d, 1), (1.879d, 1), (16.326d, 1))
.toDF("c1", "c2")
.createOrReplaceTempView("t1")
runQueryAndCompare("select c2, cast(sum(c1) as bigint) from t1 group by c2") {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[RegularHashAggregateExecTransformer]
}) == 2)
}
}
}
}
}

test("flushable aggregate rule - double sum when floatingPointMode is loose") {
withSQLConf(
"spark.gluten.sql.columnar.backend.velox.floatingPointMode" -> "loose"
) {
withTempView("t1") {
import testImplicits._
Seq((24.6d, 1), (12.1d, 1), (0.1d, 1), (6.8d, 1), (1.8d, 1), (16.3d, 1))
.toDF("c1", "c2")
.createOrReplaceTempView("t1")
runQueryAndCompare("select c2, cast(sum(c1) as bigint) from t1 group by c2") {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[RegularHashAggregateExecTransformer]
}) == 1)
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[FlushableHashAggregateExecTransformer]
}) == 1)
}
}
}
}
}
}

object VeloxAggregateFunctionsSuite {
Expand Down
5 changes: 3 additions & 2 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,9 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
// FIXME this uses process-wise off-heap memory which is not for task
// partial aggregation memory config
auto offHeapMemory = veloxCfg_->get<int64_t>(kSparkTaskOffHeapMemory, facebook::velox::memory::kMaxMemory);
auto maxPartialAggregationMemory =
static_cast<long>((veloxCfg_->get<double>(kMaxPartialAggregationMemoryRatio, 0.1) * offHeapMemory));
auto maxPartialAggregationMemory = veloxCfg_->get<int64_t>(kMaxPartialAggregationMemory).has_value()
? veloxCfg_->get<int64_t>(kMaxPartialAggregationMemory).value()
: static_cast<int64_t>((veloxCfg_->get<double>(kMaxPartialAggregationMemoryRatio, 0.1) * offHeapMemory));
auto maxExtendedPartialAggregationMemory =
static_cast<long>((veloxCfg_->get<double>(kMaxExtendedPartialAggregationMemoryRatio, 0.15) * offHeapMemory));
configs[velox::core::QueryConfig::kMaxPartialAggregationMemory] = std::to_string(maxPartialAggregationMemory);
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const std::string kCompressionKind = "spark.io.compression.codec";
const std::string kSpillCompressionKind = "spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";
const std::string kMaxPartialAggregationMemoryRatio =
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
const std::string kMaxPartialAggregationMemory = "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory";
const std::string kMaxExtendedPartialAggregationMemoryRatio =
"spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio";
const std::string kAbandonPartialAggregationMinPct =
Expand Down
Loading