From 07196a8acbf6f0a68f29f96d1eeea74f53bbeb8a Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 26 Aug 2016 00:00:35 -0700 Subject: [PATCH 1/5] [SPARK-15453] [SQL] Sort Merge Join to use bucketing metadata to optimize query plan BEFORE ``` val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1) hc.sql("DROP TABLE table8").collect df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8") hc.sql("DROP TABLE table9").collect df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9") hc.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true) == Parsed Logical Plan == 'Project [*] +- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k)) :- 'UnresolvedRelation table8, a +- 'UnresolvedRelation table9, b == Analyzed Logical Plan == i: int, j: int, k: string, i: int, j: int, k: string Project [i#119, j#120, k#121, i#122, j#123, k#124] +- Join Inner, ((j#120 = j#123) && (k#121 = k#124)) :- SubqueryAlias a : +- SubqueryAlias table8 : +- Relation[i#119,j#120,k#121] orc +- SubqueryAlias b +- SubqueryAlias table9 +- Relation[i#122,j#123,k#124] orc == Optimized Logical Plan == Join Inner, ((j#120 = j#123) && (k#121 = k#124)) :- Filter (isnotnull(k#121) && isnotnull(j#120)) : +- Relation[i#119,j#120,k#121] orc +- Filter (isnotnull(k#124) && isnotnull(j#123)) +- Relation[i#122,j#123,k#124] orc == Physical Plan == *SortMergeJoin [j#120, k#121], [j#123, k#124], Inner :- *Sort [j#120 ASC, k#121 ASC], false, 0 : +- *Project [i#119, j#120, k#121] : +- *Filter (isnotnull(k#121) && isnotnull(j#120)) : +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct +- *Sort [j#123 ASC, k#124 ASC], false, 0 +- *Project [i#122, j#123, k#124] +- *Filter (isnotnull(k#124) && isnotnull(j#123)) +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct ``` AFTER ``` == Parsed Logical Plan == 'Project [*] +- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k)) :- 'UnresolvedRelation `table8`, a +- 'UnresolvedRelation `table9`, b == Analyzed Logical Plan == i: int, j: int, k: string, i: int, j: int, k: string Project [i#48, j#49, k#50, i#51, j#52, k#53] +- Join Inner, ((j#49 = j#52) && (k#50 = k#53)) :- SubqueryAlias a : +- SubqueryAlias table8 : +- Relation[i#48,j#49,k#50] orc +- SubqueryAlias b +- SubqueryAlias table9 +- Relation[i#51,j#52,k#53] orc == Optimized Logical Plan == Join Inner, ((j#49 = j#52) && (k#50 = k#53)) :- Filter (isnotnull(k#50) && isnotnull(j#49)) : +- Relation[i#48,j#49,k#50] orc +- Filter (isnotnull(j#52) && isnotnull(k#53)) +- Relation[i#51,j#52,k#53] orc == Physical Plan == *SortMergeJoin [j#49, k#50], [j#52, k#53], Inner :- *Project [i#48, j#49, k#50] : +- *Filter (isnotnull(k#50) && isnotnull(j#49)) : +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct +- *Project [i#51, j#52, k#53] +- *Filter (isnotnull(j#52) && isnotnull(k#53)) +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct ``` --- .../sql/execution/DataSourceScanExec.scala | 60 ++++++++++++++----- .../org/apache/spark/sql/JoinSuite.scala | 46 ++++++++++++++ 2 files changed, 92 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 1a8d0e310aec0..c82bee4b7175b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ @@ -156,24 +156,56 @@ case class FileSourceScanExec( false } - override val outputPartitioning: Partitioning = { + override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { relation.bucketSpec } else { None } - bucketSpec.map { spec => - val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.flatMap { n => - output.find(_.name == n) - } - if (bucketColumns.size == spec.bucketColumnNames.size) { - HashPartitioning(bucketColumns, numBuckets) - } else { - UnknownPartitioning(0) - } - }.getOrElse { - UnknownPartitioning(0) + bucketSpec match { + case Some(spec) => + val numBuckets = spec.numBuckets + val bucketColumns = spec.bucketColumnNames.flatMap { n => + output.find(_.name == n) + } + if (bucketColumns.size == spec.bucketColumnNames.size) { + val partitioning = HashPartitioning(bucketColumns, numBuckets) + + val sortOrder = if (spec.sortColumnNames.nonEmpty) { + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given relation. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the relation has sort columns set + // Current solution is to check if all the buckets have a single file in it + + val files = + relation.location.listFiles(partitionFilters).flatMap(partition => partition.files) + val bucketToFilesGrouping = + files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) + val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) + + if (singleFilePartitions) { + def toAttribute(colName: String): Attribute = + output.find(_.name == colName).getOrElse { + throw new AnalysisException(s"Could not find sort column $colName for " + + s"relation ${metastoreTableIdentifier.get.toString} in its existing " + + s"columns : (${output.map(_.name).mkString(", ")})") + } + // TODO Currently Spark does not support writing columns sorting in descending order + // so using Ascending order. This can be fixed in future + spec.sortColumnNames.map(c => SortOrder(toAttribute(c), Ascending)) + } else { + Nil + } + } else { + Nil + } + (partitioning, sortOrder) + } else { + (UnknownPartitioning(0), Nil) + } + case _ => + (UnknownPartitioning(0), Nil) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 44889d92ee306..561de3fecd917 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -21,6 +21,7 @@ import scala.language.existentials import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -61,6 +62,51 @@ class JoinSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-15453 : Sort Merge join on bucketed + sorted tables should not add `sort` step " + + "if the join predicates are subset of the sorted columns of the tables") { + withTable("SPARK_15453_table_a", "SPARK_15453_table_b") { + withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") { + val df = + (0 until 8) + .map(i => (i, i * 2, i.toString)) + .toDF("i", "j", "k") + .coalesce(1) + df.write.bucketBy(4, "j", "k").sortBy("j", "k").saveAsTable("SPARK_15453_table_a") + df.write.bucketBy(4, "j", "k").sortBy("j", "k").saveAsTable("SPARK_15453_table_b") + + val query = """ + |SELECT * + |FROM + | SPARK_15453_table_a a + |JOIN + | SPARK_15453_table_b b + |ON a.j=b.j AND + | a.k=b.k + """.stripMargin + val joinDF = sql(query) + + val executedPlan = joinDF.queryExecution.executedPlan + val operators = executedPlan.collect { + case j: SortMergeJoinExec => j + case j: SortExec => j + } + assert(operators.size === 1) + assert(operators.head.getClass == classOf[SortMergeJoinExec]) + + checkAnswer(joinDF, + Row(0, 0, "0", 0, 0, "0") :: + Row(1, 2, "1", 1, 2, "1") :: + Row(2, 4, "2", 2, 4, "2") :: + Row(3, 6, "3", 3, 6, "3") :: + Row(4, 8, "4", 4, 8, "4") :: + Row(5, 10, "5", 5, 10, "5") :: + Row(6, 12, "6", 6, 12, "6") :: + Row(7, 14, "7", 7, 14, "7") :: Nil) + } + } + } + + test("join operator selection") { spark.sharedState.cacheManager.clearCache() From 568b742a7087e39c39a47caac300aad74174ec7d Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Sat, 3 Sep 2016 06:08:20 -0700 Subject: [PATCH 2/5] Moving listing files to a lazy val --- .../sql/execution/DataSourceScanExec.scala | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index c82bee4b7175b..ca18148386cad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} @@ -156,6 +155,8 @@ case class FileSourceScanExec( false } + @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters) + override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { relation.bucketSpec @@ -165,9 +166,15 @@ case class FileSourceScanExec( bucketSpec match { case Some(spec) => val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.flatMap { n => - output.find(_.name == n) - } + + def toAttribute(colName: String, columnType: String): Attribute = + output.find(_.name == colName).getOrElse { + throw new AnalysisException(s"Could not find $columnType column $colName for " + + s"relation ${metastoreTableIdentifier.get.toString} in its existing " + + s"columns : (${output.map(_.name).mkString(", ")})") + } + + val bucketColumns = spec.bucketColumnNames.flatMap(n => Some(toAttribute(n, "bucketing"))) if (bucketColumns.size == spec.bucketColumnNames.size) { val partitioning = HashPartitioning(bucketColumns, numBuckets) @@ -178,22 +185,15 @@ case class FileSourceScanExec( // the RDD partition will not be sorted even if the relation has sort columns set // Current solution is to check if all the buckets have a single file in it - val files = - relation.location.listFiles(partitionFilters).flatMap(partition => partition.files) + val files = selectedPartitions.flatMap(partition => partition.files) val bucketToFilesGrouping = files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) if (singleFilePartitions) { - def toAttribute(colName: String): Attribute = - output.find(_.name == colName).getOrElse { - throw new AnalysisException(s"Could not find sort column $colName for " + - s"relation ${metastoreTableIdentifier.get.toString} in its existing " + - s"columns : (${output.map(_.name).mkString(", ")})") - } // TODO Currently Spark does not support writing columns sorting in descending order // so using Ascending order. This can be fixed in future - spec.sortColumnNames.map(c => SortOrder(toAttribute(c), Ascending)) + spec.sortColumnNames.map(c => SortOrder(toAttribute(c, "sort"), Ascending)) } else { Nil } @@ -219,8 +219,6 @@ case class FileSourceScanExec( "InputPaths" -> relation.location.paths.mkString(", ")) private lazy val inputRDD: RDD[InternalRow] = { - val selectedPartitions = relation.location.listFiles(partitionFilters) - val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, From 445549b81c97f2c3024bbaff97bbca371cb37558 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 8 Sep 2016 22:10:06 -0700 Subject: [PATCH 3/5] review comments --- .../sql/execution/DataSourceScanExec.scala | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index ca18148386cad..f25b3ffd4839f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -165,20 +165,35 @@ case class FileSourceScanExec( } bucketSpec match { case Some(spec) => - val numBuckets = spec.numBuckets - - def toAttribute(colName: String, columnType: String): Attribute = - output.find(_.name == colName).getOrElse { - throw new AnalysisException(s"Could not find $columnType column $colName for " + - s"relation ${metastoreTableIdentifier.get.toString} in its existing " + - s"columns : (${output.map(_.name).mkString(", ")})") - } - - val bucketColumns = spec.bucketColumnNames.flatMap(n => Some(toAttribute(n, "bucketing"))) + // For bucketed columns: + // ----------------------- + // `HashPartitioning` would be used only when: + // 1. ALL the bucketing columns are being read from the table + // + // For sorted columns: + // --------------------- + // Sort ordering should be used when ALL these criteria's match: + // 1. `HashPartitioning` is being used + // 2. A prefix (or all) of the sort columns are being read from the table. + // + // Sort ordering would be over the prefix subset of `sort columns` being read + // from the table. + // eg. + // Assume (col0, col2, col3) are the columns read from the table + // If sort columns are (col0, col1), then sort ordering would be considered as (col0) + // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2 + // above + + def toAttribute(colName: String): Option[Attribute] = + output.find(_.name == colName) + + val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) if (bucketColumns.size == spec.bucketColumnNames.size) { - val partitioning = HashPartitioning(bucketColumns, numBuckets) + val partitioning = HashPartitioning(bucketColumns, spec.numBuckets) + val sortColumns = + spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) - val sortOrder = if (spec.sortColumnNames.nonEmpty) { + val sortOrder = if (sortColumns.nonEmpty) { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted // but those files combined together are not globally sorted. Given that, @@ -193,7 +208,7 @@ case class FileSourceScanExec( if (singleFilePartitions) { // TODO Currently Spark does not support writing columns sorting in descending order // so using Ascending order. This can be fixed in future - spec.sortColumnNames.map(c => SortOrder(toAttribute(c, "sort"), Ascending)) + sortColumns.map(attribute => SortOrder(attribute, Ascending)) } else { Nil } From 7db6c1031f401a521f84c3080cce9cc994484ac6 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 9 Sep 2016 09:13:28 -0700 Subject: [PATCH 4/5] More comprehensive tests --- .../org/apache/spark/sql/JoinSuite.scala | 45 ------------- .../spark/sql/sources/BucketedReadSuite.scala | 63 ++++++++++++++++++- 2 files changed, 61 insertions(+), 47 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 561de3fecd917..f84a42fac21c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -62,51 +62,6 @@ class JoinSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-15453 : Sort Merge join on bucketed + sorted tables should not add `sort` step " + - "if the join predicates are subset of the sorted columns of the tables") { - withTable("SPARK_15453_table_a", "SPARK_15453_table_b") { - withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") { - val df = - (0 until 8) - .map(i => (i, i * 2, i.toString)) - .toDF("i", "j", "k") - .coalesce(1) - df.write.bucketBy(4, "j", "k").sortBy("j", "k").saveAsTable("SPARK_15453_table_a") - df.write.bucketBy(4, "j", "k").sortBy("j", "k").saveAsTable("SPARK_15453_table_b") - - val query = """ - |SELECT * - |FROM - | SPARK_15453_table_a a - |JOIN - | SPARK_15453_table_b b - |ON a.j=b.j AND - | a.k=b.k - """.stripMargin - val joinDF = sql(query) - - val executedPlan = joinDF.queryExecution.executedPlan - val operators = executedPlan.collect { - case j: SortMergeJoinExec => j - case j: SortExec => j - } - assert(operators.size === 1) - assert(operators.head.getClass == classOf[SortMergeJoinExec]) - - checkAnswer(joinDF, - Row(0, 0, "0", 0, 0, "0") :: - Row(1, 2, "1", 1, 2, "1") :: - Row(2, 4, "2", 2, 4, "2") :: - Row(3, 6, "3", 3, 6, "3") :: - Row(4, 8, "4", 4, 8, "4") :: - Row(5, 10, "5", 5, 10, "5") :: - Row(6, 12, "6", 6, 12, "6") :: - Row(7, 14, "7", 7, 14, "7") :: Nil) - } - } - } - - test("join operator selection") { spark.sharedState.cacheManager.clearCache() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index ca2ec9f6a5ede..3ff85176de10e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -237,7 +237,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet bucketSpecRight: Option[BucketSpec], joinColumns: Seq[String], shuffleLeft: Boolean, - shuffleRight: Boolean): Unit = { + shuffleRight: Boolean, + sortLeft: Boolean = true, + sortRight: Boolean = true): Unit = { withTable("bucketed_table1", "bucketed_table2") { def withBucket( writer: DataFrameWriter[Row], @@ -247,6 +249,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet spec.numBuckets, spec.bucketColumnNames.head, spec.bucketColumnNames.tail: _*) + + if (spec.sortColumnNames.nonEmpty) { + writer.sortBy( + spec.sortColumnNames.head, + spec.sortColumnNames.tail: _* + ) + } else { + writer + } }.getOrElse(writer) } @@ -267,12 +278,21 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec]) val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec] + // check existence of shuffle assert( joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft, s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}") assert( joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight, s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}") + + // check existence of sort + assert( + joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft, + s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}") + assert( + joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight, + s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}") } } } @@ -321,6 +341,45 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet } } + test("avoid shuffle and sort when bucket and sort columns are join keys") { + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + testBucketing( + bucketSpec, bucketSpec, Seq("i", "j"), + shuffleLeft = false, shuffleRight = false, + sortLeft = false, sortRight = false + ) + } + + test("avoid shuffle and sort when sort columns are a super set of join keys") { + val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) + val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) + testBucketing( + bucketSpec1, bucketSpec2, Seq("i"), + shuffleLeft = false, shuffleRight = false, + sortLeft = false, sortRight = false + ) + } + + test("only sort one side when sort columns are different") { + val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) + testBucketing( + bucketSpec1, bucketSpec2, Seq("i", "j"), + shuffleLeft = false, shuffleRight = false, + sortLeft = false, sortRight = true + ) + } + + test("only sort one side when sort columns are same but their ordering is different") { + val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) + testBucketing( + bucketSpec1, bucketSpec2, Seq("i", "j"), + shuffleLeft = false, shuffleRight = false, + sortLeft = false, sortRight = true + ) + } + test("avoid shuffle when grouping keys are equal to bucket keys") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table") From 070c24994747c0479fb2520774ede27ff1cf8cac Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 9 Sep 2016 10:34:27 -0700 Subject: [PATCH 5/5] removed unused import --- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index f84a42fac21c6..44889d92ee306 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -21,7 +21,6 @@ import scala.language.existentials import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext