From 2aa29f3e506131bc83bee9e1368f34b4f445b196 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 6 Sep 2023 13:05:28 -0700 Subject: [PATCH 1/2] initial commit --- .../datasources/v2/DataSourceV2ScanExecBase.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 94667fbd00c18..936dbd2a153cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -143,17 +143,16 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { // also sort the input partitions according to their partition key order. This ensures // a canonical order from both sides of a bucketed join, for example. val partitionDataTypes = expressions.map(_.dataType) - val partitionOrdering: Ordering[(InternalRow, InputPartition)] = { - RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1) - } - val sortedKeyToPartitions = results.sorted(partitionOrdering) - val groupedPartitions = sortedKeyToPartitions + val rowOrdering = RowOrdering.createNaturalAscendingOrdering(partitionDataTypes) + val sortedKeyToPartitions = results.sorted(rowOrdering.on(_._1)) + val sortedGroupedPartitions = sortedKeyToPartitions .map(t => (InternalRowComparableWrapper(t._1, expressions), t._2)) .groupBy(_._1) .toSeq .map { case (key, s) => KeyGroupedPartition(key.row, s.map(_._2)) } + .sorted(rowOrdering.on(_.value)) - Some(KeyGroupedPartitionInfo(groupedPartitions, sortedKeyToPartitions.map(_._2))) + Some(KeyGroupedPartitionInfo(sortedGroupedPartitions, sortedKeyToPartitions.map(_._2))) } } } From 700ee83ca26feb72842631946950a5a3f96efdba Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 6 Sep 2023 21:17:30 -0700 Subject: [PATCH 2/2] fix compilation --- .../execution/datasources/v2/DataSourceV2ScanExecBase.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 936dbd2a153cb..b2f94cae2dfa7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -144,13 +144,13 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { // a canonical order from both sides of a bucketed join, for example. val partitionDataTypes = expressions.map(_.dataType) val rowOrdering = RowOrdering.createNaturalAscendingOrdering(partitionDataTypes) - val sortedKeyToPartitions = results.sorted(rowOrdering.on(_._1)) + val sortedKeyToPartitions = results.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1)) val sortedGroupedPartitions = sortedKeyToPartitions .map(t => (InternalRowComparableWrapper(t._1, expressions), t._2)) .groupBy(_._1) .toSeq .map { case (key, s) => KeyGroupedPartition(key.row, s.map(_._2)) } - .sorted(rowOrdering.on(_.value)) + .sorted(rowOrdering.on((k: KeyGroupedPartition) => k.value)) Some(KeyGroupedPartitionInfo(sortedGroupedPartitions, sortedKeyToPartitions.map(_._2))) }