diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f3df4b522c..ef0250babc 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1688,6 +1688,17 @@ impl PhysicalPlanner { let left = Arc::clone(&join_params.left.native_plan); let right = Arc::clone(&join_params.right.native_plan); + // Null-aware anti-join must run in CollectLeft mode. In Partitioned mode + // each partition only sees per-partition null/emptiness state, which can + // produce wrong NOT IN results across partitions. DataFusion's JoinSelection + // rewrites null-aware joins to CollectLeft for this reason, but Comet + // executes the physical plan directly so we must pick the mode here. + let partition_mode = if join.null_aware_anti_join { + PartitionMode::CollectLeft + } else { + PartitionMode::Partitioned + }; + let hash_join = Arc::new(HashJoinExec::try_new( left, right, @@ -1695,20 +1706,18 @@ impl PhysicalPlanner { join_params.join_filter, &join_params.join_type, None, - PartitionMode::Partitioned, + partition_mode, // null doesn't equal to null in Spark join key. If the join key is // `EqualNullSafe`, Spark will rewrite it during planning. NullEquality::NullEqualsNothing, - // null_aware is for null-aware anti joins (NOT IN subqueries). - // NullEquality controls whether NULL = NULL in join keys generally, - // while null_aware changes anti-join semantics so any NULL changes - // the entire result. Spark doesn't use this path (it rewrites - // EqualNullSafe at plan time), so false is correct. - false, + join.null_aware_anti_join, )?); - // If the hash join is build right, we need to swap the left and right - if join.build_side == BuildSide::BuildLeft as i32 { + // If the hash join is build right, we need to swap the left and right. + // Exception: null-aware anti-join requires LeftAnti + build-right semantics + // (which matches DataFusion's default), and swap_inputs would turn LeftAnti + // into RightAnti, which DataFusion rejects with null_aware=true. + if join.build_side == BuildSide::BuildLeft as i32 || join.null_aware_anti_join { Ok(( scans, shuffle_scans, @@ -4038,6 +4047,7 @@ mod tests { join_type: 0, condition: None, build_side: 0, + null_aware_anti_join: false, })), }; diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index fb438b26a4..1efa1dcc17 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -341,6 +341,9 @@ message HashJoin { JoinType join_type = 3; optional spark.spark_expression.Expr condition = 4; BuildSide build_side = 5; + // True for BroadcastHashJoinExec null-aware anti-joins (NOT IN subquery semantics). + // When true, any null in the build side suppresses all left rows. + bool null_aware_anti_join = 6; } message SortMergeJoin { diff --git a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala index a4d31a59ac..6a408ee745 100644 --- a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala +++ b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala @@ -20,7 +20,7 @@ package org.apache.comet.rules import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper} -import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} +import org.apache.spark.sql.catalyst.plans.LeftSemi import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution.{SortExec, SparkPlan} import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} @@ -67,8 +67,7 @@ object RewriteJoin extends JoinSelectionHelper { def rewrite(plan: SparkPlan): SparkPlan = plan match { case smj: SortMergeJoinExec => getSmjBuildSide(smj) match { - case Some(BuildRight) if smj.joinType == LeftAnti || smj.joinType == LeftSemi => - // LeftAnti https://github.com/apache/datafusion-comet/issues/457 + case Some(BuildRight) if smj.joinType == LeftSemi => // LeftSemi https://github.com/apache/datafusion-comet/issues/2667 withInfo( smj, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 5f7e91529d..f48bf45440 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -1710,10 +1710,10 @@ trait CometHashJoin { return None } - if (join.buildSide == BuildRight && join.joinType == LeftAnti) { - // https://github.com/apache/datafusion-comet/issues/457 - withInfo(join, "BuildRight with LeftAnti is not supported") - return None + // Only BroadcastHashJoinExec can be null-aware (NOT IN subqueries). + val isNullAwareAntiJoin = join match { + case bhj: BroadcastHashJoinExec => bhj.isNullAwareAntiJoin + case _ => false } val condition = join.condition.map { cond => @@ -1754,6 +1754,7 @@ trait CometHashJoin { .addAllRightJoinKeys(rightKeys.map(_.get).asJava) .setBuildSide(if (join.buildSide == BuildLeft) OperatorOuterClass.BuildSide.BuildLeft else OperatorOuterClass.BuildSide.BuildRight) + .setNullAwareAntiJoin(isNullAwareAntiJoin) condition.foreach(joinBuilder.setCondition) Some(builder.setHashJoin(joinBuilder).build()) } else { diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_datafusion/extended.txt index d9dce96ba0..41b9fb6b4e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_datafusion/extended.txt @@ -1,60 +1,56 @@ -TakeOrderedAndProject -+- HashAggregate - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometNativeScan parquet spark_catalog.default.store_sales - : : : : : +- CometSubqueryBroadcast - : : : : : +- CometBroadcastExchange - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometNativeScan parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 53 eligible operators (66%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 50 out of 53 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_iceberg_compat/extended.txt index 726fb6b283..e553dcb0a5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q69.native_iceberg_compat/extended.txt @@ -1,61 +1,57 @@ -TakeOrderedAndProject -+- HashAggregate - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales - : : : : : +- SubqueryBroadcast - : : : : : +- BroadcastExchange - : : : : : +- CometNativeColumnarToRow - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales + : : : : : +- SubqueryBroadcast + : : : : : +- BroadcastExchange + : : : : : +- CometNativeColumnarToRow + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics -Comet accelerated 34 out of 53 eligible operators (64%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 49 out of 53 eligible operators (92%). Final plan contains 2 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_datafusion/extended.txt index b95d260929..c965be9cc3 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_datafusion/extended.txt @@ -1,54 +1,52 @@ -HashAggregate -+- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : :- CometNativeColumnarToRow - : : +- CometHashAggregate - : : +- CometExchange - : : +- CometHashAggregate - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.store_sales - : : : : +- CometSubqueryBroadcast - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.customer - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometHashAggregate - : +- CometExchange - : +- CometHashAggregate - : +- CometProject - : +- CometBroadcastHashJoin - : :- CometProject - : : +- CometBroadcastHashJoin - : : :- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- CometBroadcastExchange - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometBroadcastHashJoin + : :- CometHashAggregate + : : +- CometExchange + : : +- CometHashAggregate + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.store_sales + : : : : +- CometSubqueryBroadcast + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.customer + : +- CometBroadcastExchange + : +- CometHashAggregate + : +- CometExchange + : +- CometHashAggregate + : +- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer + +- CometBroadcastExchange +- CometHashAggregate +- CometExchange +- CometHashAggregate @@ -68,4 +66,4 @@ HashAggregate +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 55 out of 66 eligible operators (83%). Final plan contains 3 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 63 out of 66 eligible operators (95%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_iceberg_compat/extended.txt index 6218729c98..cb4d06350b 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q87.native_iceberg_compat/extended.txt @@ -1,55 +1,53 @@ -HashAggregate -+- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : :- CometNativeColumnarToRow - : : +- CometHashAggregate - : : +- CometExchange - : : +- CometHashAggregate - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometFilter - : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometHashAggregate - : +- CometExchange - : +- CometHashAggregate - : +- CometProject - : +- CometBroadcastHashJoin - : :- CometProject - : : +- CometBroadcastHashJoin - : : :- CometFilter - : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : +- CometBroadcastExchange - : +- CometProject - : +- CometFilter - : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometBroadcastHashJoin + : :- CometHashAggregate + : : +- CometExchange + : : +- CometHashAggregate + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometFilter + : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer + : +- CometBroadcastExchange + : +- CometHashAggregate + : +- CometExchange + : +- CometHashAggregate + : +- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometFilter + : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer + +- CometBroadcastExchange +- CometHashAggregate +- CometExchange +- CometHashAggregate @@ -69,4 +67,4 @@ HashAggregate +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer -Comet accelerated 54 out of 66 eligible operators (81%). Final plan contains 4 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 62 out of 66 eligible operators (93%). Final plan contains 2 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_datafusion/extended.txt index d9dce96ba0..41b9fb6b4e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_datafusion/extended.txt @@ -1,60 +1,56 @@ -TakeOrderedAndProject -+- HashAggregate - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometNativeScan parquet spark_catalog.default.store_sales - : : : : : +- CometSubqueryBroadcast - : : : : : +- CometBroadcastExchange - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometNativeScan parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 53 eligible operators (66%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 50 out of 53 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_iceberg_compat/extended.txt index 726fb6b283..e553dcb0a5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q69.native_iceberg_compat/extended.txt @@ -1,61 +1,57 @@ -TakeOrderedAndProject -+- HashAggregate - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales - : : : : : +- SubqueryBroadcast - : : : : : +- BroadcastExchange - : : : : : +- CometNativeColumnarToRow - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales + : : : : : +- SubqueryBroadcast + : : : : : +- BroadcastExchange + : : : : : +- CometNativeColumnarToRow + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics -Comet accelerated 34 out of 53 eligible operators (64%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 49 out of 53 eligible operators (92%). Final plan contains 2 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_datafusion/extended.txt index b95d260929..c965be9cc3 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_datafusion/extended.txt @@ -1,54 +1,52 @@ -HashAggregate -+- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : :- CometNativeColumnarToRow - : : +- CometHashAggregate - : : +- CometExchange - : : +- CometHashAggregate - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.store_sales - : : : : +- CometSubqueryBroadcast - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.customer - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometHashAggregate - : +- CometExchange - : +- CometHashAggregate - : +- CometProject - : +- CometBroadcastHashJoin - : :- CometProject - : : +- CometBroadcastHashJoin - : : :- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- CometBroadcastExchange - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometBroadcastHashJoin + : :- CometHashAggregate + : : +- CometExchange + : : +- CometHashAggregate + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.store_sales + : : : : +- CometSubqueryBroadcast + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.customer + : +- CometBroadcastExchange + : +- CometHashAggregate + : +- CometExchange + : +- CometHashAggregate + : +- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer + +- CometBroadcastExchange +- CometHashAggregate +- CometExchange +- CometHashAggregate @@ -68,4 +66,4 @@ HashAggregate +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 55 out of 66 eligible operators (83%). Final plan contains 3 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 63 out of 66 eligible operators (95%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_iceberg_compat/extended.txt index 6218729c98..cb4d06350b 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q87.native_iceberg_compat/extended.txt @@ -1,55 +1,53 @@ -HashAggregate -+- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : :- CometNativeColumnarToRow - : : +- CometHashAggregate - : : +- CometExchange - : : +- CometHashAggregate - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometFilter - : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometHashAggregate - : +- CometExchange - : +- CometHashAggregate - : +- CometProject - : +- CometBroadcastHashJoin - : :- CometProject - : : +- CometBroadcastHashJoin - : : :- CometFilter - : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : +- CometBroadcastExchange - : +- CometProject - : +- CometFilter - : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometBroadcastHashJoin + : :- CometHashAggregate + : : +- CometExchange + : : +- CometHashAggregate + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometFilter + : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer + : +- CometBroadcastExchange + : +- CometHashAggregate + : +- CometExchange + : +- CometHashAggregate + : +- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometFilter + : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer + +- CometBroadcastExchange +- CometHashAggregate +- CometExchange +- CometHashAggregate @@ -69,4 +67,4 @@ HashAggregate +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer -Comet accelerated 54 out of 66 eligible operators (81%). Final plan contains 4 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 62 out of 66 eligible operators (93%). Final plan contains 2 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_datafusion/extended.txt index d9dce96ba0..41b9fb6b4e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_datafusion/extended.txt @@ -1,60 +1,56 @@ -TakeOrderedAndProject -+- HashAggregate - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometNativeScan parquet spark_catalog.default.store_sales - : : : : : +- CometSubqueryBroadcast - : : : : : +- CometBroadcastExchange - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometNativeScan parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 53 eligible operators (66%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 50 out of 53 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_iceberg_compat/extended.txt index 726fb6b283..e553dcb0a5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.native_iceberg_compat/extended.txt @@ -1,61 +1,57 @@ -TakeOrderedAndProject -+- HashAggregate - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales - : : : : : +- SubqueryBroadcast - : : : : : +- BroadcastExchange - : : : : : +- CometNativeColumnarToRow - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales + : : : : : +- SubqueryBroadcast + : : : : : +- BroadcastExchange + : : : : : +- CometNativeColumnarToRow + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics -Comet accelerated 34 out of 53 eligible operators (64%). Final plan contains 6 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 49 out of 53 eligible operators (92%). Final plan contains 2 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_datafusion/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_datafusion/extended.txt index b95d260929..c965be9cc3 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_datafusion/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_datafusion/extended.txt @@ -1,54 +1,52 @@ -HashAggregate -+- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : :- CometNativeColumnarToRow - : : +- CometHashAggregate - : : +- CometExchange - : : +- CometHashAggregate - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.store_sales - : : : : +- CometSubqueryBroadcast - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.customer - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometHashAggregate - : +- CometExchange - : +- CometHashAggregate - : +- CometProject - : +- CometBroadcastHashJoin - : :- CometProject - : : +- CometBroadcastHashJoin - : : :- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- CometBroadcastExchange - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometBroadcastHashJoin + : :- CometHashAggregate + : : +- CometExchange + : : +- CometHashAggregate + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.store_sales + : : : : +- CometSubqueryBroadcast + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.customer + : +- CometBroadcastExchange + : +- CometHashAggregate + : +- CometExchange + : +- CometHashAggregate + : +- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer + +- CometBroadcastExchange +- CometHashAggregate +- CometExchange +- CometHashAggregate @@ -68,4 +66,4 @@ HashAggregate +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer -Comet accelerated 55 out of 66 eligible operators (83%). Final plan contains 3 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 63 out of 66 eligible operators (95%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_iceberg_compat/extended.txt index 6218729c98..cb4d06350b 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.native_iceberg_compat/extended.txt @@ -1,55 +1,53 @@ -HashAggregate -+- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported] - : :- CometNativeColumnarToRow - : : +- CometHashAggregate - : : +- CometExchange - : : +- CometHashAggregate - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometFilter - : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales - : : : : +- SubqueryBroadcast - : : : : +- BroadcastExchange - : : : : +- CometNativeColumnarToRow - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometHashAggregate - : +- CometExchange - : +- CometHashAggregate - : +- CometProject - : +- CometBroadcastHashJoin - : :- CometProject - : : +- CometBroadcastHashJoin - : : :- CometFilter - : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim - : +- CometBroadcastExchange - : +- CometProject - : +- CometFilter - : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometBroadcastHashJoin + : :- CometHashAggregate + : : +- CometExchange + : : +- CometHashAggregate + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometFilter + : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales + : : : : +- SubqueryBroadcast + : : : : +- BroadcastExchange + : : : : +- CometNativeColumnarToRow + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer + : +- CometBroadcastExchange + : +- CometHashAggregate + : +- CometExchange + : +- CometHashAggregate + : +- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometFilter + : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer + +- CometBroadcastExchange +- CometHashAggregate +- CometExchange +- CometHashAggregate @@ -69,4 +67,4 @@ HashAggregate +- CometFilter +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer -Comet accelerated 54 out of 66 eligible operators (81%). Final plan contains 4 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 62 out of 66 eligible operators (93%). Final plan contains 2 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 49fbe10c30..d73ddd29cc 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -190,8 +190,66 @@ class CometJoinSuite extends CometTestBase { val df8 = left.join(right, left("_2") === right("_1"), "leftsemi") checkSparkAnswerAndOperator(df8) - // DataFusion HashJoin LeftAnti has bugs in handling nulls and is disabled for now. - // left.join(right, left("_2") === right("_1"), "leftanti") + val df9 = left.join(right, left("_2") === right("_1"), "leftanti") + checkSparkAnswerAndOperator(df9) + } + } + } + } + + test("BroadcastHashJoin with LeftAnti and NOT IN subquery (null-aware)") { + withSQLConf( + SQLConf.PREFER_SORTMERGEJOIN.key -> "false", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "10485760", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10485760") { + // Right side has no NULL: regular anti-semantics + withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { + withParquetTable((0 until 5).map(i => (i, i + 100)), "tbl_b") { + val df = sql("SELECT * FROM tbl_a WHERE _2 NOT IN (SELECT _1 FROM tbl_b)") + checkSparkAnswerAndOperator(df) + } + } + + // Right side contains NULL: null-aware should suppress all left rows + withParquetTable(Seq[(Int, Integer)]((1, 1), (2, 2), (3, 3)), "tbl_a") { + withParquetTable(Seq[(Integer, Int)]((1, 100), (null, 200)), "tbl_b") { + val df = sql("SELECT * FROM tbl_a WHERE _2 NOT IN (SELECT _1 FROM tbl_b)") + checkSparkAnswerAndOperator(df) + } + } + + // Left side has NULL values: NOT IN filters them out (NULL vs anything is NULL) + withParquetTable(Seq[(Int, Integer)]((1, 1), (2, null), (3, 3)), "tbl_a") { + withParquetTable(Seq[(Integer, Int)]((2, 100), (4, 200)), "tbl_b") { + val df = sql("SELECT * FROM tbl_a WHERE _2 NOT IN (SELECT _1 FROM tbl_b)") + checkSparkAnswerAndOperator(df) + } + } + } + } + + test("BroadcastHashJoin with LeftAnti (non-null-aware)") { + withSQLConf( + SQLConf.PREFER_SORTMERGEJOIN.key -> "false", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "10485760", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10485760") { + withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { + withParquetTable((0 until 5).map(i => (i, i + 100)), "tbl_b") { + // BROADCAST(tbl_b) forces tbl_b as build-right side + val df = sql( + "SELECT /*+ BROADCAST(tbl_b) */ * FROM tbl_a LEFT ANTI JOIN tbl_b " + + "ON tbl_a._2 = tbl_b._1") + checkSparkAnswerAndOperator(df) + } + } + + // With NULL values on both sides - non-null-aware semantics: NULL keys don't match anything + withParquetTable(Seq[(Int, Integer)]((1, 1), (2, null), (3, 3)), "tbl_a") { + withParquetTable(Seq[(Integer, Int)]((1, 100), (null, 200)), "tbl_b") { + val df = sql( + "SELECT /*+ BROADCAST(tbl_b) */ * FROM tbl_a LEFT ANTI JOIN tbl_b " + + "ON tbl_a._2 = tbl_b._1") + checkSparkAnswerAndOperator(df) } } }