fix: distributed RangePartitioning bounds calculation with native shuffle#2258
fix: distributed RangePartitioning bounds calculation with native shuffle#2258mbutrovich merged 47 commits intoapache:mainfrom
Conversation
…for native shuffle to consume. Added new test to represent apache#1906.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2258 +/- ##
============================================
+ Coverage 56.12% 58.46% +2.34%
- Complexity 976 1440 +464
============================================
Files 119 146 +27
Lines 11743 13520 +1777
Branches 2251 2351 +100
============================================
+ Hits 6591 7905 +1314
- Misses 4012 4381 +369
- Partials 1140 1234 +94 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…sult in 1 partition.
…ow to handle dictionary encoding.
# Conflicts: # spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
…erate the partitioning scheme. This solves the issue where the input schema says it contains dictionaries that were later going to be unpacked by CopyExec. Will open an issue to understand why we even wrap the child in CopyExec in the first place.
# Conflicts: # native/core/src/execution/planner.rs
Updated this branch after merging and looks clean still. |
That's effectively what the test |
…_range_partitioning
| MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId) | ||
| } | ||
|
|
||
| private def isSinglePartitioning(p: Partitioning): Boolean = p match { |
There was a problem hiding this comment.
the entire method might be simplified just
p.numPartitions <= 1
?
There was a problem hiding this comment.
The number of partition bounds can be less than the number of target partitions based on value cardinality, so we still need to check rangePartitionBounds.
| val numParts = rdd.getNumPartitions | ||
|
|
||
| // The code block below is mostly brought over from | ||
| // ShuffleExchangeExec::prepareShuffleDependency |
There was a problem hiding this comment.
It might be non trivial to do so but we could think about this being a plan we could execute on the native side. Essentially, your original range partitioner but distributed.
| serializer: Serializer, | ||
| metrics: Map[String, SQLMetric]): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { | ||
| val numParts = rdd.getNumPartitions | ||
|
|
There was a problem hiding this comment.
Can we report the time spent in this? It might be useful to decide if this is worthy of optimization.
There was a problem hiding this comment.
I'll open a followup issue.
| @@ -26,15 +27,15 @@ pub enum CometPartitioning { | |||
| Hash(Vec<Arc<dyn PhysicalExpr>>, usize), | |||
| /// Allocate rows based on the lexical order of one of more expressions and the specified number of | |||
There was a problem hiding this comment.
I'm thinking would be that intuitive for the user to have
Arc<RowConverter>, Vec<OwnedRow>
here? 🤔
There was a problem hiding this comment.
Could you expand on this please? I'm not sure I understand the requested change.
There was a problem hiding this comment.
Sorry for misleading comment.
I was thinking if to compare with others variants like Hash
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
it is quite intuitive that hash depends on numPartitions and expression that supposed to be hashed.
for Range it is
RangePartitioning(LexOrdering, usize, Arc<RowConverter>, Vec<OwnedRow>),
which looks no so intuitive IMO, because cannot say when reading what is the meaning of last 2 params.
Anyway, this design question can be addressed in follow up if needed
| // Create a RowConverter and use to create OwnedRows from the Arrays | ||
| let converter = RowConverter::new(sort_fields)?; | ||
| let rows = converter.convert_columns(&arrays)?; | ||
| let owned_rows: Vec<OwnedRow> = rows.iter().map(|row| row.owned()).collect(); |
There was a problem hiding this comment.
maybe we comment here what is owned_rows here?
Is it actual rows before shuffle?
For simplicity attaching a diagram with RR flow
[ Before Shuffle ]
Executor 1: (5,A) (15,B) (30,C) Executor 2: (40,D) (70,E) (90,F)
| |
v v
[ Shuffle Write: Buckets on Disk ]
Executor 1: [P0:(5,15,30)] [P1: ] [P2: ] Executor 2: [P0: ] [P1:(40)] [P2:(70,90)]
| |
v v
[ Shuffle Read: Reducers Fetch Buckets ]
Reducer P0 <---- E1.Bucket0 + E2.Bucket0 ----> (5,15,30)
Reducer P1 <---- E1.Bucket1 + E2.Bucket1 ----> (40)
Reducer P2 <---- E1.Bucket2 + E2.Bucket2 ----> (70,90)
[ After Shuffle = Range Partitions ]
P0: (5,15,30) P1: (40) P2: (70,90)
There was a problem hiding this comment.
owned_rows are the boundary values. I can make it more explicit.
| .doc("Whether to enable range partitioning for Comet native shuffle.") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
| .createWithDefault(true) |
There was a problem hiding this comment.
should we keep it as false
then run some benches and real tests with this param true and later enable it by default?
There was a problem hiding this comment.
I discussed with @andygrove and we were comfortable merging with true back in June. I think if you're opting into native shuffle we should try to accelerate all partitioning schemes, and if we discover issues it can be toggled off.
There was a problem hiding this comment.
Enabling it by default now gives us more opportunities to find bugs over the next few weeks before we release 0.11.0 and we can always disable if we find issues in that time.
comphead
left a comment
There was a problem hiding this comment.
Thanks @mbutrovich I left some minors, but overall I think the PR would be good to go
…fle/CometNativeShuffleWriter.scala Co-authored-by: Oleks V <comphead@users.noreply.github.com>
andygrove
left a comment
There was a problem hiding this comment.
This is a significant improvement! Thanks @mbutrovich
Which issue does this PR close?
Closes #1906.
Rationale for this change
#1862 tried to implement RangePartitioning with native shuffle. The implementation for bounds calculation didn't work in a distributed setting because executors calculated their own partition boundaries.
What changes are included in this PR?
This modifies the flow for the driver to calculate the boundaries (like Spark). At a high level:
ShuffleExchangeExecfor using Spark'sRangePartitionerto calculate boundary rows.How are these changes tested?