Skip to content
14 changes: 14 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,20 @@ object CometConf extends ShimCometConf {
"The maximum number of columns to hash for round robin partitioning must be non-negative.")
.createWithDefault(0)

val COMET_EXEC_SHUFFLE_REVERT_REDUNDANT_COLUMNAR_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.revertRedundantColumnar.enabled")
.category(CATEGORY_SHUFFLE)
.doc(
"When enabled, Comet reverts a `CometShuffleExchangeExec` with `CometColumnarShuffle` " +
"back to Spark's `ShuffleExchangeExec` when both its parent and child are non-Comet " +
"hash aggregate operators. This avoids a redundant " +
"row -> Arrow -> shuffle -> Arrow -> row conversion when no Comet operator on either " +
"side can consume columnar output. Disable to keep Comet columnar shuffle even in " +
"that case, which preserves Comet's off-heap shuffle memory accounting at the cost of " +
"the extra conversion.")
.booleanConf
.createWithDefault(true)

val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.category(CATEGORY_SHUFFLE)
Expand Down
18 changes: 18 additions & 0 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,24 @@ partitioning keys. Columns that are not partitioning keys may contain complex ty
Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and
`SinglePartitioning`. This shuffle implementation supports complex data types as partitioning keys.

#### Automatic Revert to Spark Shuffle

When a Comet columnar shuffle ends up between two non-Comet operators (for example, a partial/final hash aggregate
pair that Comet could not convert), Comet reverts it to Spark's built-in shuffle. Keeping columnar shuffle between
two row-based operators would add `row -> Arrow -> shuffle -> Arrow -> row` conversions with no Comet consumer on
either side to benefit from columnar output.

This shifts the affected shuffles from Comet's off-heap memory pool back to the JVM execution memory pool. Clusters
tuned for a small JVM heap may see `ExternalSorter` spills on queries where this revert fires. Shuffle I/O may also
grow marginally because Spark's row-based serializer generally compresses less well than Comet's Arrow IPC format.

Each revert is logged at `INFO` level on the driver as `Reverting Comet columnar shuffle to Spark shuffle between
<parent> and <child>`, which lets you correlate any unexpected behavior with this optimization.

This optimization is enabled by default and can be disabled by setting
`spark.comet.exec.shuffle.revertRedundantColumnar.enabled=false`, in which case Comet will keep the columnar shuffle
even when both its parent and child are non-Comet operators.

### Shuffle Compression

By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression.
Expand Down
100 changes: 88 additions & 12 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ object CometExecRule {

val allExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = nativeExecs ++ sinks

/**
* Tag set on a `ShuffleExchangeExec` that should be left as a plain Spark shuffle rather than
* wrapped in `CometShuffleExchangeExec`. See `tagRedundantColumnarShuffle`.
*/
val SKIP_COMET_SHUFFLE_TAG: org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit] =
org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit]("comet.skipCometShuffle")

}

/**
Expand All @@ -100,19 +107,78 @@ case class CometExecRule(session: SparkSession)

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

/**
* Revert any `CometShuffleExchangeExec` with `CometColumnarShuffle` whose parent and child are
* both non-Comet `HashAggregateExec` / `ObjectHashAggregateExec` operators back to the original
* Spark `ShuffleExchangeExec`. This is the partial-final-aggregate pattern where Comet couldn't
* convert either aggregate; keeping a columnar shuffle between them only adds
* row->arrow->shuffle->arrow->row conversion overhead with no Comet consumer on either side.
* See https://github.com/apache/datafusion-comet/issues/4004.
*
* The match is intentionally narrow (both sides must be row-based aggregates that remained JVM
* after the main transform pass). Running the revert post-transform means we only fire when the
* main conversion already decided to keep both aggregates JVM - we never create the dangerous
* mixed mode where a Comet partial feeds a JVM final (see issue #1389).
*
* Correctness depends on running as part of `preColumnarTransitions`: if the revert ran after
* Spark inserted `ColumnarToRowExec` between the aggregate and the columnar shuffle, the
* pattern would no longer match (the shuffle would be separated from the aggregate by the
* transition) and the unnecessary conversion could not be eliminated.
*
* The reverted shuffle is tagged with `SKIP_COMET_SHUFFLE_TAG` so both the AQE
* `QueryStagePrepRule` pass and the `ColumnarRule` `preColumnarTransitions` pass leave it alone
* on re-entry - AQE in particular re-runs the rule on each stage in isolation, where the outer
* aggregate context is no longer visible and the shuffle would otherwise be re-wrapped as a
* Comet columnar shuffle.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also add that correctness depends on running as preColumnarTransitions (before Spark inserts ColumnarToRowExec), and that the SKIP_COMET_SHUFFLE_TAG guards against re-conversion during both AQE QueryStagePrepRule and ColumnarRule passes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded the docstring in 2241ccb to call out the preColumnarTransitions correctness dependency and the SKIP_COMET_SHUFFLE_TAG guarding both the AQE QueryStagePrepRule and ColumnarRule re-entries.

private def revertRedundantColumnarShuffle(plan: SparkPlan): SparkPlan = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the opttimzation @andygrove
I guess this is the first time where CometShuffleExchangeExec is reverted back to a plain ShuffleExchangeExec.

The two shuffle paths use different memory systems:

  • Comet columnar shuffle uses Comet's own memory pool. (off-heap)
  • Spark vanilla shuffle uses the JVM execution memory pool , with spills managed by ExternalSorter.

Users who have tuned their clusters for Comet (smaller JVM heap) could see unexpected spills after this chang, shifting shuffle memory pressure back to theJVM.
Additionally, Comet's Arrow IPC columnar format typically compresses better than Spark's row-based UnsafeRowSerializer path, so shuffle I/O mayalso increase.
It would be good to document or log when a shuffle is reverted so users can correlate any unexpected behavior with this optimization.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @karuppayya. I'm assuming the cost of doing two transitions (r2c then c2r) would outweigh the benefits of using Comet shuffle? I agree that it would be worth adding documentation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this optimization will improve performance and compute efficiency.

My main concern is determining the best recommendation for users to tune memory, particularly since they cannot explicitly disable it.

Also can it be a seperate rule in itself and have it only in org.apache.comet.CometSparkSessionExtensions.CometExecColumnar#postColumnarTransitions?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya I added a section to the tuning guide and also added a config to enable/disable this optimization. Could you take another look?

I'm open to refactoring to create different rules, but would prefer to wait for some current DPP work to finish first, and also some work for fixing planning issues with mixed partial/final aggregates.

def isAggregate(p: SparkPlan): Boolean =
p.isInstanceOf[HashAggregateExec] || p.isInstanceOf[ObjectHashAggregateExec]

def isRedundantShuffle(child: SparkPlan): Boolean = child match {
case s: CometShuffleExchangeExec =>
s.shuffleType == CometColumnarShuffle && isAggregate(s.child)
case _ => false
}

plan.transform {
case op if isAggregate(op) && op.children.exists(isRedundantShuffle) =>
val newChildren = op.children.map {
case s: CometShuffleExchangeExec
if s.shuffleType == CometColumnarShuffle && isAggregate(s.child) =>
val reverted =
s.originalPlan.withNewChildren(Seq(s.child)).asInstanceOf[ShuffleExchangeExec]
reverted.setTagValue(CometExecRule.SKIP_COMET_SHUFFLE_TAG, ())
logInfo(
"Reverting Comet columnar shuffle to Spark shuffle between " +
s"${op.getClass.getSimpleName} and ${s.child.getClass.getSimpleName} " +
"(no Comet operator on either side to consume columnar output)")
reverted
case other => other
}
op.withNewChildren(newChildren)
}
}

private def shouldSkipCometShuffle(s: ShuffleExchangeExec): Boolean =
s.getTagValue(CometExecRule.SKIP_COMET_SHUFFLE_TAG).isDefined

private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
plan.transformUp { case s: ShuffleExchangeExec =>
CometShuffleExchangeExec.shuffleSupported(s) match {
case Some(CometNativeShuffle) =>
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
case Some(CometColumnarShuffle) =>
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
case None =>
s
}
plan.transformUp {
case s: ShuffleExchangeExec if shouldSkipCometShuffle(s) =>
s
case s: ShuffleExchangeExec =>
CometShuffleExchangeExec.shuffleSupported(s) match {
case Some(CometNativeShuffle) =>
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
case Some(CometColumnarShuffle) =>
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
case None =>
s
}
}
}

Expand Down Expand Up @@ -261,6 +327,9 @@ case class CometExecRule(session: SparkSession)
case s @ ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) =>
convertToComet(s, CometExchangeSink).getOrElse(s)

case s: ShuffleExchangeExec if shouldSkipCometShuffle(s) =>
s

case s: ShuffleExchangeExec =>
convertToComet(s, CometShuffleExchangeExec).getOrElse(s)

Expand Down Expand Up @@ -464,6 +533,13 @@ case class CometExecRule(session: SparkSession)
case CometScanWrapper(_, s) => s
}

// Revert CometColumnarShuffle to Spark's ShuffleExchangeExec when both its parent and child
// are non-Comet HashAggregate/ObjectHashAggregate operators that remained JVM after the main
// transform pass. See https://github.com/apache/datafusion-comet/issues/4004.
if (CometConf.COMET_EXEC_SHUFFLE_REVERT_REDUNDANT_COLUMNAR_ENABLED.get()) {
newPlan = revertRedundantColumnarShuffle(newPlan)
}

// Set up logical links
newPlan = newPlan.transform {
case op: CometExec =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,62 +1,61 @@
TakeOrderedAndProject
+- HashAggregate
+- CometNativeColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- Project
: +- BroadcastHashJoin
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
: : : :- CometNativeColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.customer
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometBroadcastHashJoin
: : : : :- CometNativeScan parquet spark_catalog.default.store_sales
: : : : : +- CometSubqueryBroadcast
: : : : : +- CometBroadcastExchange
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometNativeScan parquet spark_catalog.default.web_sales
: : : : +- ReusedSubquery
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometNativeScan parquet spark_catalog.default.catalog_sales
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer_demographics
+- Exchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- Project
: +- BroadcastHashJoin
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
: : : :- CometNativeColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.customer
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometBroadcastHashJoin
: : : : :- CometNativeScan parquet spark_catalog.default.store_sales
: : : : : +- CometSubqueryBroadcast
: : : : : +- CometBroadcastExchange
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometNativeScan parquet spark_catalog.default.web_sales
: : : : +- ReusedSubquery
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometNativeScan parquet spark_catalog.default.catalog_sales
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometNativeColumnarToRow
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer_demographics

Comet accelerated 36 out of 54 eligible operators (66%). Final plan contains 6 transitions between Spark and Comet.
Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 5 transitions between Spark and Comet.
Loading
Loading