From d158ce1331125a246d6383bb5def75ff0260d399 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Sun, 19 Feb 2017 16:34:09 -0800 Subject: [PATCH 1/3] fix. --- .../spark/sql/sources/BucketedReadSuite.scala | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index d9ddcbd57ca83..1180e19894add 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -240,6 +240,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet joinCondition: (DataFrame, DataFrame) => Column, shuffleLeft: Boolean, shuffleRight: Boolean, + numPartitions: Int = 10, sortLeft: Boolean = true, sortRight: Boolean = true): Unit = { withTable("bucketed_table1", "bucketed_table2") { @@ -263,8 +264,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet }.getOrElse(writer) } - withBucket(df1.write.format("parquet"), bucketSpecLeft).saveAsTable("bucketed_table1") - withBucket(df2.write.format("parquet"), bucketSpecRight).saveAsTable("bucketed_table2") + withBucket(df1.repartition(numPartitions).write.format("parquet"), bucketSpecLeft) + .saveAsTable("bucketed_table1") + withBucket(df2.repartition(numPartitions).write.format("parquet"), bucketSpecRight) + .saveAsTable("bucketed_table2") withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { @@ -291,10 +294,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet // check existence of sort assert( joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft, - s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}") + s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}") assert( joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight, - s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}") + s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") } } } @@ -393,11 +396,31 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet joinCondition = joinCondition(Seq("i", "j")), shuffleLeft = false, shuffleRight = false, + numPartitions = 1, sortLeft = false, sortRight = false ) } + test("sort when bucket and sort columns are join keys when having multiple files in a bucket") { + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given relation. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the relation has sort columns set + // Therefore, we still need to keep the Sort in both sides. + testBucketing( + bucketSpecLeft = bucketSpec, + bucketSpecRight = bucketSpec, + joinCondition = joinCondition(Seq("i", "j")), + shuffleLeft = false, + shuffleRight = false, + numPartitions = 50, + sortLeft = true, + sortRight = true + ) + } + test("avoid shuffle and sort when sort columns are a super set of join keys") { val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) @@ -407,6 +430,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet joinCondition = joinCondition(Seq("i")), shuffleLeft = false, shuffleRight = false, + numPartitions = 1, sortLeft = false, sortRight = false ) @@ -421,6 +445,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet joinCondition = joinCondition(Seq("i", "j")), shuffleLeft = false, shuffleRight = false, + numPartitions = 1, sortLeft = false, sortRight = true ) @@ -435,6 +460,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet joinCondition = joinCondition(Seq("i", "j")), shuffleLeft = false, shuffleRight = false, + numPartitions = 1, sortLeft = false, sortRight = true ) @@ -482,6 +508,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet }, shuffleLeft = false, shuffleRight = false, + numPartitions = 1, sortLeft = false, sortRight = false ) From f1569bf1a0a3047aef860bde18d8d34f71548886 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Sun, 19 Feb 2017 20:22:13 -0800 Subject: [PATCH 2/3] fix. --- .../spark/sql/sources/BucketedReadSuite.scala | 240 ++++++++++-------- 1 file changed, 129 insertions(+), 111 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 1180e19894add..87bdea39aca6c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -227,6 +227,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") + case class BucketTableTestSpec( + bucketSpec: Option[BucketSpec], + numPartitions: Int = 10, + expectedShuffle: Boolean = true, + expectedSort: Boolean = true) + /** * A helper method to test the bucket read functionality using join. It will save `df1` and `df2` * to hive tables, bucketed or not, according to the given bucket specifics. Next we will join @@ -234,15 +240,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet * exists as user expected according to the `shuffleLeft` and `shuffleRight`. */ private def testBucketing( - bucketSpecLeft: Option[BucketSpec], - bucketSpecRight: Option[BucketSpec], + bucketTableTestSpecLeft: BucketTableTestSpec, + bucketTableTestSpecRight: BucketTableTestSpec, joinType: String = "inner", - joinCondition: (DataFrame, DataFrame) => Column, - shuffleLeft: Boolean, - shuffleRight: Boolean, - numPartitions: Int = 10, - sortLeft: Boolean = true, - sortRight: Boolean = true): Unit = { + joinCondition: (DataFrame, DataFrame) => Column): Unit = { + val BucketTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) = + bucketTableTestSpecLeft + val BucketTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) = + bucketTableTestSpecRight + withTable("bucketed_table1", "bucketed_table2") { def withBucket( writer: DataFrameWriter[Row], @@ -264,9 +270,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet }.getOrElse(writer) } - withBucket(df1.repartition(numPartitions).write.format("parquet"), bucketSpecLeft) + withBucket(df1.repartition(numPartitionsLeft).write.format("parquet"), bucketSpecLeft) .saveAsTable("bucketed_table1") - withBucket(df2.repartition(numPartitions).write.format("parquet"), bucketSpecRight) + withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight) .saveAsTable("bucketed_table2") withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", @@ -308,161 +314,174 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("avoid shuffle when join 2 bucketed tables") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = false) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 ignore("avoid shuffle when join keys are a super-set of bucket keys") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = false) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } test("only shuffle one side when join bucketed table and non-bucketed table") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketTableTestSpecRight = BucketTableTestSpec(None, expectedShuffle = true) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = None, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = true + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } test("only shuffle one side when 2 bucketed tables have different bucket number") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil)) - val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil)) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil)) + val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpecLeft, expectedShuffle = false) + val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpecRight, expectedShuffle = true) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = true + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } test("only shuffle one side when 2 bucketed tables have different bucket keys") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil)) - val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil)) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil)) + val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpecLeft, expectedShuffle = false) + val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpecRight, expectedShuffle = true) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i")), - shuffleLeft = false, - shuffleRight = true + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("i")) ) } test("shuffle when join keys are not equal to bucket keys") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = true) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("j")), - shuffleLeft = true, - shuffleRight = true + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("j")) ) } test("shuffle when join 2 bucketed tables with bucketing disabled") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = true) withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = true, - shuffleRight = true + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } } - test("avoid shuffle and sort when bucket and sort columns are join keys") { - val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false, - numPartitions = 1, - sortLeft = false, - sortRight = false - ) - } - - test("sort when bucket and sort columns are join keys when having multiple files in a bucket") { - val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + test("check sort and shuffle when bucket and sort columns are join keys") { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted // but those files combined together are not globally sorted. Given that, // the RDD partition will not be sorted even if the relation has sort columns set // Therefore, we still need to keep the Sort in both sides. + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + + val bucketTableTestSpecLeft1 = BucketTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + val bucketTableTestSpecRight1 = BucketTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketTableTestSpecLeft1, + bucketTableTestSpecRight1, + joinCondition = joinCondition(Seq("i", "j")) + ) + + val bucketTableTestSpecLeft2 = BucketTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketTableTestSpecRight2 = BucketTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false, - numPartitions = 50, - sortLeft = true, - sortRight = true + bucketTableTestSpecLeft2, + bucketTableTestSpecRight2, + joinCondition = joinCondition(Seq("i", "j")) + ) + + val bucketTableTestSpecLeft3 = BucketTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + val bucketTableTestSpecRight3 = BucketTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + testBucketing( + bucketTableTestSpecLeft3, + bucketTableTestSpecRight3, + joinCondition = joinCondition(Seq("i", "j")) + ) + + val bucketTableTestSpecLeft4 = BucketTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketTableTestSpecRight4 = BucketTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketTableTestSpecLeft4, + bucketTableTestSpecRight4, + joinCondition = joinCondition(Seq("i", "j")) ) } test("avoid shuffle and sort when sort columns are a super set of join keys") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) - val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) + val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) + val bucketTableTestSpecLeft = BucketTableTestSpec( + bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketTableTestSpecRight = BucketTableTestSpec( + bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i")), - shuffleLeft = false, - shuffleRight = false, - numPartitions = 1, - sortLeft = false, - sortRight = false + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("i")) ) } test("only sort one side when sort columns are different") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) + val bucketTableTestSpecLeft = BucketTableTestSpec( + bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketTableTestSpecRight = BucketTableTestSpec( + bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false, - numPartitions = 1, - sortLeft = false, - sortRight = true + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } test("only sort one side when sort columns are same but their ordering is different") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) + val bucketTableTestSpecLeft = BucketTableTestSpec( + bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketTableTestSpecRight = BucketTableTestSpec( + bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false, - numPartitions = 1, - sortLeft = false, - sortRight = true + bucketTableTestSpecLeft, + bucketTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } @@ -496,21 +515,20 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("SPARK-17698 Join predicates should not contain filter clauses") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i"))) + val bucketTableTestSpecLeft = BucketTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketTableTestSpecRight = BucketTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, + bucketTableTestSpecLeft, + bucketTableTestSpecRight, joinType = "fullouter", joinCondition = (left: DataFrame, right: DataFrame) => { val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _) val filterLeft = left("i") === Literal("1") val filterRight = right("i") === Literal("1") joinPredicates && filterLeft && filterRight - }, - shuffleLeft = false, - shuffleRight = false, - numPartitions = 1, - sortLeft = false, - sortRight = false + } ) } From 4b73130d33d2af1e74a688b7e19db0fb5d90f72e Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Sun, 19 Feb 2017 20:35:33 -0800 Subject: [PATCH 3/3] style fix --- .../spark/sql/sources/BucketedReadSuite.scala | 134 +++++++++--------- 1 file changed, 67 insertions(+), 67 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 87bdea39aca6c..4fc72b9e47597 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -227,7 +227,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") - case class BucketTableTestSpec( + case class BucketedTableTestSpec( bucketSpec: Option[BucketSpec], numPartitions: Int = 10, expectedShuffle: Boolean = true, @@ -240,14 +240,14 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet * exists as user expected according to the `shuffleLeft` and `shuffleRight`. */ private def testBucketing( - bucketTableTestSpecLeft: BucketTableTestSpec, - bucketTableTestSpecRight: BucketTableTestSpec, + bucketedTableTestSpecLeft: BucketedTableTestSpec, + bucketedTableTestSpecRight: BucketedTableTestSpec, joinType: String = "inner", joinCondition: (DataFrame, DataFrame) => Column): Unit = { - val BucketTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) = - bucketTableTestSpecLeft - val BucketTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) = - bucketTableTestSpecRight + val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) = + bucketedTableTestSpecLeft + val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) = + bucketedTableTestSpecRight withTable("bucketed_table1", "bucketed_table2") { def withBucket( @@ -314,11 +314,11 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("avoid shuffle when join 2 bucketed tables") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) - val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = false) - val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("i", "j")) ) } @@ -326,22 +326,22 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 ignore("avoid shuffle when join keys are a super-set of bucket keys") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) - val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = false) - val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("i", "j")) ) } test("only shuffle one side when join bucketed table and non-bucketed table") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) - val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = false) - val bucketTableTestSpecRight = BucketTableTestSpec(None, expectedShuffle = true) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("i", "j")) ) } @@ -349,11 +349,11 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("only shuffle one side when 2 bucketed tables have different bucket number") { val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil)) val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil)) - val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpecLeft, expectedShuffle = false) - val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpecRight, expectedShuffle = true) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("i", "j")) ) } @@ -361,34 +361,34 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("only shuffle one side when 2 bucketed tables have different bucket keys") { val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil)) val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil)) - val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpecLeft, expectedShuffle = false) - val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpecRight, expectedShuffle = true) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("i")) ) } test("shuffle when join keys are not equal to bucket keys") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) - val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = true) - val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("j")) ) } test("shuffle when join 2 bucketed tables with bucketing disabled") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) - val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = true) - val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("i", "j")) ) } @@ -402,43 +402,43 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet // Therefore, we still need to keep the Sort in both sides. val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketTableTestSpecLeft1 = BucketTableTestSpec( + val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketTableTestSpecRight1 = BucketTableTestSpec( + val bucketedTableTestSpecRight1 = BucketedTableTestSpec( bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) testBucketing( - bucketTableTestSpecLeft1, - bucketTableTestSpecRight1, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, + bucketedTableTestSpecRight = bucketedTableTestSpecRight1, joinCondition = joinCondition(Seq("i", "j")) ) - val bucketTableTestSpecLeft2 = BucketTableTestSpec( + val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketTableTestSpecRight2 = BucketTableTestSpec( + val bucketedTableTestSpecRight2 = BucketedTableTestSpec( bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) testBucketing( - bucketTableTestSpecLeft2, - bucketTableTestSpecRight2, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, + bucketedTableTestSpecRight = bucketedTableTestSpecRight2, joinCondition = joinCondition(Seq("i", "j")) ) - val bucketTableTestSpecLeft3 = BucketTableTestSpec( + val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketTableTestSpecRight3 = BucketTableTestSpec( + val bucketedTableTestSpecRight3 = BucketedTableTestSpec( bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) testBucketing( - bucketTableTestSpecLeft3, - bucketTableTestSpecRight3, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, + bucketedTableTestSpecRight = bucketedTableTestSpecRight3, joinCondition = joinCondition(Seq("i", "j")) ) - val bucketTableTestSpecLeft4 = BucketTableTestSpec( + val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketTableTestSpecRight4 = BucketTableTestSpec( + val bucketedTableTestSpecRight4 = BucketedTableTestSpec( bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) testBucketing( - bucketTableTestSpecLeft4, - bucketTableTestSpecRight4, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, + bucketedTableTestSpecRight = bucketedTableTestSpecRight4, joinCondition = joinCondition(Seq("i", "j")) ) } @@ -446,13 +446,13 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("avoid shuffle and sort when sort columns are a super set of join keys") { val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) - val bucketTableTestSpecLeft = BucketTableTestSpec( + val bucketedTableTestSpecLeft = BucketedTableTestSpec( bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketTableTestSpecRight = BucketTableTestSpec( + val bucketedTableTestSpecRight = BucketedTableTestSpec( bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("i")) ) } @@ -460,13 +460,13 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("only sort one side when sort columns are different") { val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) - val bucketTableTestSpecLeft = BucketTableTestSpec( + val bucketedTableTestSpecLeft = BucketedTableTestSpec( bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketTableTestSpecRight = BucketTableTestSpec( + val bucketedTableTestSpecRight = BucketedTableTestSpec( bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("i", "j")) ) } @@ -474,13 +474,13 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("only sort one side when sort columns are same but their ordering is different") { val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) - val bucketTableTestSpecLeft = BucketTableTestSpec( + val bucketedTableTestSpecLeft = BucketedTableTestSpec( bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketTableTestSpecRight = BucketTableTestSpec( + val bucketedTableTestSpecRight = BucketedTableTestSpec( bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinCondition = joinCondition(Seq("i", "j")) ) } @@ -515,13 +515,13 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("SPARK-17698 Join predicates should not contain filter clauses") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i"))) - val bucketTableTestSpecLeft = BucketTableTestSpec( + val bucketedTableTestSpecLeft = BucketedTableTestSpec( bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketTableTestSpecRight = BucketTableTestSpec( + val bucketedTableTestSpecRight = BucketedTableTestSpec( bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) testBucketing( - bucketTableTestSpecLeft, - bucketTableTestSpecRight, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinType = "fullouter", joinCondition = (left: DataFrame, right: DataFrame) => { val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _)