From ea219c06c2a3a7d3465766f7a6789dd19e98b46a Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Fri, 2 Jul 2021 11:21:20 +0800 Subject: [PATCH 1/5] [SPARK-35984][SQL] Config to force applying shuffled hash join --- .../apache/spark/sql/catalyst/optimizer/joins.scala | 4 ++-- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++ .../resources/sql-tests/inputs/postgreSQL/join.sql | 2 +- .../test/scala/org/apache/spark/sql/JoinSuite.scala | 8 ++++++++ 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 31a48c8cb54cd..1865f859bd1e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -272,14 +272,14 @@ trait JoinSelectionHelper { val buildLeft = if (hintOnly) { hintToShuffleHashJoinLeft(hint) } else { - hintToPreferShuffleHashJoinLeft(hint) || + hintToPreferShuffleHashJoinLeft(hint) || conf.forceApplyShuffledHashJoin || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) && muchSmaller(left, right)) } val buildRight = if (hintOnly) { hintToShuffleHashJoinRight(hint) } else { - hintToPreferShuffleHashJoinRight(hint) || + hintToPreferShuffleHashJoinRight(hint) || conf.forceApplyShuffledHashJoin || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) && muchSmaller(right, left)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 59a310d1e4f52..8f9a820709e1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -419,6 +419,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val FORCE_APPLY_SHUFFLEDHASHJOIN = buildConf("spark.sql.join.forceApplyShuffledHashJoin") + .internal() + .doc("When true, force applying shuffled hash join even if the table sizes exceed the " + + "threshold. This is for testing/benchmarking only. If this config is set to true, the " + + "value spark.sql.join.perferSortMergejoin will be ignored.") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort") .internal() .doc("When true, enable use of radix sort when possible. Radix sort is much faster but " + @@ -3718,6 +3727,8 @@ class SQLConf extends Serializable with Logging { def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) + def forceApplyShuffledHashJoin: Boolean = getConf(FORCE_APPLY_SHUFFLEDHASHJOIN) + def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED) diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql index 183e79ee98a2e..ca018873440f5 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql @@ -13,7 +13,7 @@ --CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=10485760 --CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=true ---CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=false +--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.conf.forceApplyShuffledHashJoin=true --CONFIG_DIM2 spark.sql.codegen.wholeStage=true --CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index acbf30089a92e..a7f551d162008 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1394,4 +1394,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan checkAnswer(fullJoinDF, Row(100)) } } + + test("SPARK-35984: Config to force applying shuffled hash join") { + val sql = "SELECT * FROM testData JOIN testData2 ON key = a" + assertJoin(sql, classOf[SortMergeJoinExec]) + withSQLConf(SQLConf.FORCE_APPLY_SHUFFLEDHASHJOIN.key -> "true") { + assertJoin(sql, classOf[ShuffledHashJoinExec]) + } + } } From b19772040eefb5938a39e78b114e75c4e338676b Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Fri, 2 Jul 2021 11:55:42 +0800 Subject: [PATCH 2/5] fix test --- .../src/test/resources/sql-tests/inputs/postgreSQL/join.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql index ca018873440f5..dc0b56112d8c5 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql @@ -13,7 +13,7 @@ --CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=10485760 --CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=true ---CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.conf.forceApplyShuffledHashJoin=true +--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.forceApplyShuffledHashJoin=true --CONFIG_DIM2 spark.sql.codegen.wholeStage=true --CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY From f3474a20e712b40997a52f6f23605e7a51524f0f Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Sat, 3 Jul 2021 16:59:48 +0800 Subject: [PATCH 3/5] address comments --- .../org/apache/spark/sql/catalyst/optimizer/joins.scala | 9 ++++++--- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 1865f859bd1e9..db91be6dcab08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils /** * Reorder the joins and push all the conditions into join, so that the bottom ones have at least @@ -272,16 +273,18 @@ trait JoinSelectionHelper { val buildLeft = if (hintOnly) { hintToShuffleHashJoinLeft(hint) } else { - hintToPreferShuffleHashJoinLeft(hint) || conf.forceApplyShuffledHashJoin || + hintToPreferShuffleHashJoinLeft(hint) || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) && - muchSmaller(left, right)) + muchSmaller(left, right)) || + (Utils.isTesting && conf.forceApplyShuffledHashJoin) } val buildRight = if (hintOnly) { hintToShuffleHashJoinRight(hint) } else { hintToPreferShuffleHashJoinRight(hint) || conf.forceApplyShuffledHashJoin || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) && - muchSmaller(right, left)) + muchSmaller(right, left)) || + (Utils.isTesting && conf.forceApplyShuffledHashJoin) } getBuildSide( canBuildShuffledHashJoinLeft(joinType) && buildLeft, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8f9a820709e1b..6cd64f91ed285 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -423,8 +423,8 @@ object SQLConf { .internal() .doc("When true, force applying shuffled hash join even if the table sizes exceed the " + "threshold. This is for testing/benchmarking only. If this config is set to true, the " + - "value spark.sql.join.perferSortMergejoin will be ignored.") - .version("3.2.0") + s"value ${PREFER_SORTMERGEJOIN.key} will be ignored.") + .version("3.3.0") .booleanConf .createWithDefault(false) From d0dfd8da57067ef4de809f7d541d3847383a395d Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 6 Jul 2021 18:07:13 +0800 Subject: [PATCH 4/5] code clean --- .../spark/sql/catalyst/optimizer/joins.scala | 15 ++++++++++++--- .../org/apache/spark/sql/internal/SQLConf.scala | 11 ----------- .../scala/org/apache/spark/sql/JoinSuite.scala | 2 +- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index db91be6dcab08..7fff7ae0862ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -276,15 +276,16 @@ trait JoinSelectionHelper { hintToPreferShuffleHashJoinLeft(hint) || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) && muchSmaller(left, right)) || - (Utils.isTesting && conf.forceApplyShuffledHashJoin) + (Utils.isTesting && forceApplyShuffledHashJoin(conf)) } val buildRight = if (hintOnly) { hintToShuffleHashJoinRight(hint) } else { - hintToPreferShuffleHashJoinRight(hint) || conf.forceApplyShuffledHashJoin || + hintToPreferShuffleHashJoinRight(hint) || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) && muchSmaller(right, left)) || - (Utils.isTesting && conf.forceApplyShuffledHashJoin) + (Utils.isTesting && forceApplyShuffledHashJoin(conf)) + } getBuildSide( canBuildShuffledHashJoinLeft(joinType) && buildLeft, @@ -427,5 +428,13 @@ trait JoinSelectionHelper { private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes } + + /** + * Returns whether a shuffled hash join should be force applied. + * The config key is hard-coded because it's testing only and should not be exposed. + */ + private def forceApplyShuffledHashJoin(conf: SQLConf): Boolean = { + conf.getConfString("spark.sql.join.forceApplyShuffledHashJoin", "false") == "true" + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6cd64f91ed285..59a310d1e4f52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -419,15 +419,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val FORCE_APPLY_SHUFFLEDHASHJOIN = buildConf("spark.sql.join.forceApplyShuffledHashJoin") - .internal() - .doc("When true, force applying shuffled hash join even if the table sizes exceed the " + - "threshold. This is for testing/benchmarking only. If this config is set to true, the " + - s"value ${PREFER_SORTMERGEJOIN.key} will be ignored.") - .version("3.3.0") - .booleanConf - .createWithDefault(false) - val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort") .internal() .doc("When true, enable use of radix sort when possible. Radix sort is much faster but " + @@ -3727,8 +3718,6 @@ class SQLConf extends Serializable with Logging { def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) - def forceApplyShuffledHashJoin: Boolean = getConf(FORCE_APPLY_SHUFFLEDHASHJOIN) - def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index a7f551d162008..abfc19ac6dffd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1398,7 +1398,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-35984: Config to force applying shuffled hash join") { val sql = "SELECT * FROM testData JOIN testData2 ON key = a" assertJoin(sql, classOf[SortMergeJoinExec]) - withSQLConf(SQLConf.FORCE_APPLY_SHUFFLEDHASHJOIN.key -> "true") { + withSQLConf("spark.sql.join.forceApplyShuffledHashJoin" -> "true") { assertJoin(sql, classOf[ShuffledHashJoinExec]) } } From 81fdaae64d6e30cf885c96eea7c1cb3072537bcd Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 6 Jul 2021 23:15:41 +0800 Subject: [PATCH 5/5] move isTesting --- .../org/apache/spark/sql/catalyst/optimizer/joins.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 7fff7ae0862ae..d6e2a59de0b78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -276,7 +276,7 @@ trait JoinSelectionHelper { hintToPreferShuffleHashJoinLeft(hint) || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) && muchSmaller(left, right)) || - (Utils.isTesting && forceApplyShuffledHashJoin(conf)) + forceApplyShuffledHashJoin(conf) } val buildRight = if (hintOnly) { hintToShuffleHashJoinRight(hint) @@ -284,8 +284,7 @@ trait JoinSelectionHelper { hintToPreferShuffleHashJoinRight(hint) || (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) && muchSmaller(right, left)) || - (Utils.isTesting && forceApplyShuffledHashJoin(conf)) - + forceApplyShuffledHashJoin(conf) } getBuildSide( canBuildShuffledHashJoinLeft(joinType) && buildLeft, @@ -434,7 +433,8 @@ trait JoinSelectionHelper { * The config key is hard-coded because it's testing only and should not be exposed. */ private def forceApplyShuffledHashJoin(conf: SQLConf): Boolean = { - conf.getConfString("spark.sql.join.forceApplyShuffledHashJoin", "false") == "true" + Utils.isTesting && + conf.getConfString("spark.sql.join.forceApplyShuffledHashJoin", "false") == "true" } }