From 53859d4c46619ab4cb3f449b9bfb88825b09900d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 17:28:06 -0600 Subject: [PATCH 1/7] feat: add SortBasedPartitioner for sort-based shuffle repartitioning Add a new shuffle partitioner that processes each batch immediately by sorting rows by partition ID using counting sort, slicing the sorted batch at partition boundaries, and writing each slice to per-partition spill files. This avoids per-partition builder memory overhead compared to the existing MultiPartitionShuffleRepartitioner. Also add spill_batch method to PartitionWriter for writing individual batches to spill files. --- native/shuffle/src/partitioners/mod.rs | 4 + native/shuffle/src/partitioners/sort_based.rs | 325 ++++++++++++++++++ native/shuffle/src/writers/spill.rs | 27 ++ 3 files changed, 356 insertions(+) create mode 100644 native/shuffle/src/partitioners/sort_based.rs diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index a0bc652b4b..b46b5dd4e2 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -19,10 +19,14 @@ mod empty_schema; mod multi_partition; mod partitioned_batch_iterator; mod single_partition; +mod sort_based; mod traits; pub(crate) use empty_schema::EmptySchemaShufflePartitioner; pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; pub(crate) use single_partition::SinglePartitionShufflePartitioner; +#[allow(unused_imports)] +// TODO: Remove once SortBasedPartitioner is wired into the shuffle writer factory +pub(crate) use sort_based::SortBasedPartitioner; pub(crate) use traits::ShufflePartitioner; diff --git a/native/shuffle/src/partitioners/sort_based.rs b/native/shuffle/src/partitioners/sort_based.rs new file mode 100644 index 0000000000..a9b427e728 --- /dev/null +++ b/native/shuffle/src/partitioners/sort_based.rs @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// TODO: Remove once SortBasedPartitioner is wired into the shuffle writer factory +#![allow(dead_code)] + +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::ShufflePartitioner; +use crate::writers::PartitionWriter; +use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; +use arrow::array::{ArrayRef, RecordBatch, UInt32Array}; +use arrow::compute::take; +use arrow::datatypes::SchemaRef; +use datafusion::common::DataFusionError; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; +use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Seek, Write}; +use std::sync::Arc; +use tokio::time::Instant; + +/// A shuffle repartitioner that sorts each batch by partition ID using counting sort, +/// then slices and writes per-partition sub-batches immediately. This avoids +/// per-partition Arrow builders, so memory usage is O(batch_size) regardless of +/// partition count. +pub(crate) struct SortBasedPartitioner { + output_data_file: String, + output_index_file: String, + #[allow(dead_code)] + schema: SchemaRef, + partition_writers: Vec, + shuffle_block_writer: ShuffleBlockWriter, + partitioning: CometPartitioning, + runtime: Arc, + metrics: ShufflePartitionerMetrics, + batch_size: usize, + #[allow(dead_code)] + reservation: MemoryReservation, + write_buffer_size: usize, + hashes_buf: Vec, + partition_ids: Vec, + sorted_indices: Vec, + partition_starts: Vec, +} + +impl SortBasedPartitioner { + #[allow(clippy::too_many_arguments)] + pub(crate) fn try_new( + partition: usize, + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + partitioning: CometPartitioning, + metrics: ShufflePartitionerMetrics, + runtime: Arc, + batch_size: usize, + codec: CompressionCodec, + write_buffer_size: usize, + ) -> datafusion::common::Result { + let num_output_partitions = partitioning.partition_count(); + let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; + let partition_writers = (0..num_output_partitions) + .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) + .collect::>>()?; + let reservation = MemoryConsumer::new(format!("SortBasedPartitioner[{partition}]")) + .register(&runtime.memory_pool); + let hashes_buf = match partitioning { + CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => { + vec![0u32; batch_size] + } + _ => vec![], + }; + Ok(Self { + output_data_file, + output_index_file, + schema, + partition_writers, + shuffle_block_writer, + partitioning, + runtime, + metrics, + batch_size, + reservation, + write_buffer_size, + hashes_buf, + partition_ids: vec![0u32; batch_size], + sorted_indices: vec![0u32; batch_size], + partition_starts: vec![0usize; num_output_partitions + 1], + }) + } + + fn compute_partition_ids_and_sort( + &mut self, + input: &RecordBatch, + ) -> datafusion::common::Result<()> { + let num_rows = input.num_rows(); + let num_partitions = self.partitioning.partition_count(); + + match &self.partitioning { + CometPartitioning::Hash(exprs, num_output_partitions) => { + let arrays = exprs + .iter() + .map(|expr| expr.evaluate(input)?.into_array(num_rows)) + .collect::>>()?; + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + create_murmur3_hashes(&arrays, hashes_buf)?; + let partition_ids = &mut self.partition_ids[..num_rows]; + for (idx, hash) in hashes_buf.iter().enumerate() { + partition_ids[idx] = + comet_partitioning::pmod(*hash, *num_output_partitions) as u32; + } + } + CometPartitioning::RangePartitioning( + lex_ordering, + _num_output_partitions, + row_converter, + bounds, + ) => { + let arrays = lex_ordering + .iter() + .map(|expr| expr.expr.evaluate(input)?.into_array(num_rows)) + .collect::>>()?; + let row_batch = row_converter.convert_columns(arrays.as_slice())?; + let partition_ids = &mut self.partition_ids[..num_rows]; + row_batch.iter().enumerate().for_each(|(row_idx, row)| { + partition_ids[row_idx] = bounds + .as_slice() + .partition_point(|bound| bound.row() <= row) + as u32; + }); + } + CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { + let num_columns_to_hash = if *max_hash_columns == 0 { + input.num_columns() + } else { + (*max_hash_columns).min(input.num_columns()) + }; + let columns_to_hash: Vec = (0..num_columns_to_hash) + .map(|i| Arc::clone(input.column(i))) + .collect(); + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + create_murmur3_hashes(&columns_to_hash, hashes_buf)?; + let partition_ids = &mut self.partition_ids[..num_rows]; + for (idx, hash) in hashes_buf.iter().enumerate() { + partition_ids[idx] = + comet_partitioning::pmod(*hash, *num_output_partitions) as u32; + } + } + other => { + return Err(DataFusionError::NotImplemented(format!( + "Unsupported shuffle partitioning scheme {other:?}" + ))); + } + } + + // Counting sort + let partition_starts = &mut self.partition_starts[..num_partitions + 1]; + partition_starts.fill(0); + let partition_ids = &self.partition_ids[..num_rows]; + for &pid in partition_ids.iter() { + partition_starts[pid as usize + 1] += 1; + } + for i in 1..=num_partitions { + partition_starts[i] += partition_starts[i - 1]; + } + let sorted_indices = &mut self.sorted_indices[..num_rows]; + let mut cursors = partition_starts.to_vec(); + for (row_idx, &pid) in partition_ids.iter().enumerate() { + let pos = cursors[pid as usize]; + sorted_indices[pos] = row_idx as u32; + cursors[pid as usize] += 1; + } + Ok(()) + } + + fn process_batch(&mut self, input: RecordBatch) -> datafusion::common::Result<()> { + if input.num_rows() == 0 { + return Ok(()); + } + let num_rows = input.num_rows(); + let num_partitions = self.partitioning.partition_count(); + + self.metrics.data_size.add(input.get_array_memory_size()); + self.metrics.baseline.record_output(num_rows); + + { + let repart_start = Instant::now(); + self.compute_partition_ids_and_sort(&input)?; + self.metrics + .repart_time + .add_duration(repart_start.elapsed()); + } + + let sorted_indices = &self.sorted_indices[..num_rows]; + let partition_starts = &self.partition_starts[..num_partitions + 1]; + let indices_array = UInt32Array::from_iter_values(sorted_indices.iter().copied()); + let sorted_batch = RecordBatch::try_new( + input.schema(), + input + .columns() + .iter() + .map(|col| take(col, &indices_array, None)) + .collect::, _>>()?, + )?; + + for partition_id in 0..num_partitions { + let start = partition_starts[partition_id]; + let end = partition_starts[partition_id + 1]; + let len = end - start; + if len == 0 { + continue; + } + let partition_batch = sorted_batch.slice(start, len); + self.partition_writers[partition_id].spill_batch( + &partition_batch, + &self.runtime, + &self.metrics, + self.write_buffer_size, + self.batch_size, + )?; + } + Ok(()) + } +} + +#[async_trait::async_trait] +impl ShufflePartitioner for SortBasedPartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + let mut start = 0; + while start < batch.num_rows() { + let end = (start + self.batch_size).min(batch.num_rows()); + let slice = batch.slice(start, end - start); + self.process_batch(slice)?; + start = end; + } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } + + fn shuffle_write(&mut self) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + let num_output_partitions = self.partition_writers.len(); + let mut offsets = vec![0i64; num_output_partitions + 1]; + let data_file = self.output_data_file.clone(); + let index_file = self.output_index_file.clone(); + + let output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + let mut output_data = BufWriter::new(output_data); + + for (i, partition_writer) in self + .partition_writers + .iter() + .enumerate() + .take(num_output_partitions) + { + offsets[i] = output_data.stream_position()? as i64; + if let Some(spill_path) = partition_writer.path() { + let mut spill_file = File::open(spill_path)?; + let mut write_timer = self.metrics.write_time.timer(); + std::io::copy(&mut spill_file, &mut output_data)?; + write_timer.stop(); + } + } + + let mut write_timer = self.metrics.write_time.timer(); + output_data.flush()?; + write_timer.stop(); + + offsets[num_output_partitions] = output_data.stream_position()? as i64; + + let mut write_timer = self.metrics.write_time.timer(); + let mut output_index = BufWriter::new( + File::create(index_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, + ); + for offset in offsets { + output_index.write_all(&offset.to_le_bytes()[..])?; + } + output_index.flush()?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } +} + +impl Debug for SortBasedPartitioner { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("SortBasedPartitioner") + .field("partitions", &self.partition_writers.len()) + .finish() + } +} diff --git a/native/shuffle/src/writers/spill.rs b/native/shuffle/src/writers/spill.rs index c16caddbf9..46f8588cf9 100644 --- a/native/shuffle/src/writers/spill.rs +++ b/native/shuffle/src/writers/spill.rs @@ -19,6 +19,7 @@ use super::ShuffleBlockWriter; use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::PartitionedBatchIterator; use crate::writers::buf_batch_writer::BufBatchWriter; +use arrow::array::RecordBatch; use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv; @@ -113,6 +114,32 @@ impl PartitionWriter { } } + /// Write a single batch to this partition's spill file. + #[allow(dead_code)] // TODO: Remove once SortBasedPartitioner is wired in + pub(crate) fn spill_batch( + &mut self, + batch: &RecordBatch, + runtime: &RuntimeEnv, + metrics: &ShufflePartitionerMetrics, + write_buffer_size: usize, + batch_size: usize, + ) -> datafusion::common::Result { + if batch.num_rows() == 0 { + return Ok(0); + } + self.ensure_spill_file_created(runtime)?; + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut self.spill_file.as_mut().unwrap().file, + write_buffer_size, + batch_size, + ); + let bytes_written = + buf_batch_writer.write(batch, &metrics.encode_time, &metrics.write_time)?; + buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; + Ok(bytes_written) + } + pub(crate) fn path(&self) -> Option<&std::path::Path> { self.spill_file .as_ref() From ec2820c142418ca5b31b4225f34e426cba1268ef Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 17:33:57 -0600 Subject: [PATCH 2/7] feat: wire SortBasedPartitioner into shuffle writer and add tests Add sort_based parameter to ShuffleWriterExec and external_shuffle to enable sort-based partitioning as an alternative to the default multi-partition hash repartitioner. When sort_based is true and more than one partition is requested, the SortBasedPartitioner is used. Add test cases for sort-based shuffle covering basic operation, larger and smaller batch sizes, and large numbers of partitions. --- native/core/src/execution/planner.rs | 1 + native/shuffle/benches/shuffle_writer.rs | 1 + native/shuffle/src/bin/shuffle_bench.rs | 1 + native/shuffle/src/partitioners/mod.rs | 2 - native/shuffle/src/partitioners/sort_based.rs | 5 -- native/shuffle/src/shuffle_writer.rs | 69 ++++++++++++++++--- 6 files changed, 63 insertions(+), 16 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ac35925ace..b31c21f0d9 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1379,6 +1379,7 @@ impl PhysicalPlanner { writer.output_index_file.clone(), writer.tracing_enabled, write_buffer_size, + false, )?); Ok(( diff --git a/native/shuffle/benches/shuffle_writer.rs b/native/shuffle/benches/shuffle_writer.rs index 27abd919fa..e4e7940630 100644 --- a/native/shuffle/benches/shuffle_writer.rs +++ b/native/shuffle/benches/shuffle_writer.rs @@ -153,6 +153,7 @@ fn create_shuffle_writer_exec( "/tmp/index.out".to_string(), false, 1024 * 1024, + false, ) .unwrap() } diff --git a/native/shuffle/src/bin/shuffle_bench.rs b/native/shuffle/src/bin/shuffle_bench.rs index bb8c2a0380..9ea5e75c03 100644 --- a/native/shuffle/src/bin/shuffle_bench.rs +++ b/native/shuffle/src/bin/shuffle_bench.rs @@ -477,6 +477,7 @@ async fn execute_shuffle_write( index_file, false, write_buffer_size, + false, ) .expect("Failed to create ShuffleWriterExec"); diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index b46b5dd4e2..c15e44f07a 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -26,7 +26,5 @@ pub(crate) use empty_schema::EmptySchemaShufflePartitioner; pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; pub(crate) use single_partition::SinglePartitionShufflePartitioner; -#[allow(unused_imports)] -// TODO: Remove once SortBasedPartitioner is wired into the shuffle writer factory pub(crate) use sort_based::SortBasedPartitioner; pub(crate) use traits::ShufflePartitioner; diff --git a/native/shuffle/src/partitioners/sort_based.rs b/native/shuffle/src/partitioners/sort_based.rs index a9b427e728..e0322ef796 100644 --- a/native/shuffle/src/partitioners/sort_based.rs +++ b/native/shuffle/src/partitioners/sort_based.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -// TODO: Remove once SortBasedPartitioner is wired into the shuffle writer factory -#![allow(dead_code)] - use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; use crate::writers::PartitionWriter; @@ -43,7 +40,6 @@ use tokio::time::Instant; pub(crate) struct SortBasedPartitioner { output_data_file: String, output_index_file: String, - #[allow(dead_code)] schema: SchemaRef, partition_writers: Vec, shuffle_block_writer: ShuffleBlockWriter, @@ -51,7 +47,6 @@ pub(crate) struct SortBasedPartitioner { runtime: Arc, metrics: ShufflePartitionerMetrics, batch_size: usize, - #[allow(dead_code)] reservation: MemoryReservation, write_buffer_size: usize, hashes_buf: Vec, diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index 8502c79624..91a8e46502 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -20,7 +20,7 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::{ EmptySchemaShufflePartitioner, MultiPartitionShuffleRepartitioner, ShufflePartitioner, - SinglePartitionShufflePartitioner, + SinglePartitionShufflePartitioner, SortBasedPartitioner, }; use crate::{CometPartitioning, CompressionCodec}; use async_trait::async_trait; @@ -67,6 +67,8 @@ pub struct ShuffleWriterExec { tracing_enabled: bool, /// Size of the write buffer in bytes write_buffer_size: usize, + /// Whether to use sort-based partitioning + sort_based: bool, } impl ShuffleWriterExec { @@ -80,6 +82,7 @@ impl ShuffleWriterExec { output_index_file: String, tracing_enabled: bool, write_buffer_size: usize, + sort_based: bool, ) -> Result { let cache = Arc::new(PlanProperties::new( EquivalenceProperties::new(Arc::clone(&input.schema())), @@ -98,6 +101,7 @@ impl ShuffleWriterExec { codec, tracing_enabled, write_buffer_size, + sort_based, }) } } @@ -158,6 +162,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.output_index_file.clone(), self.tracing_enabled, self.write_buffer_size, + self.sort_based, )?)), _ => panic!("ShuffleWriterExec wrong number of children"), } @@ -185,6 +190,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.codec.clone(), self.tracing_enabled, self.write_buffer_size, + self.sort_based, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -205,6 +211,7 @@ async fn external_shuffle( codec: CompressionCodec, tracing_enabled: bool, write_buffer_size: usize, + sort_based: bool, ) -> Result { let schema = input.schema(); @@ -229,6 +236,18 @@ async fn external_shuffle( codec, write_buffer_size, )?), + _ if sort_based => Box::new(SortBasedPartitioner::try_new( + partition, + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning, + metrics, + context.runtime_env(), + context.session_config().batch_size(), + codec, + write_buffer_size, + )?), _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( partition, output_data_file, @@ -312,34 +331,61 @@ mod test { #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_single_partition_shuffle_writer() { - shuffle_write_test(1000, 100, 1, None); - shuffle_write_test(10000, 10, 1, None); + shuffle_write_test(1000, 100, 1, None, false); + shuffle_write_test(10000, 10, 1, None, false); } #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_insert_larger_batch() { - shuffle_write_test(10000, 1, 16, None); + shuffle_write_test(10000, 1, 16, None, false); } #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_insert_smaller_batch() { - shuffle_write_test(1000, 1, 16, None); - shuffle_write_test(1000, 10, 16, None); + shuffle_write_test(1000, 1, 16, None, false); + shuffle_write_test(1000, 10, 16, None, false); } #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_large_number_of_partitions() { - shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024)); - shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024)); + shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024), false); + shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024), false); } #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_large_number_of_partitions_spilling() { - shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024)); + shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024), false); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_sort_based_basic() { + shuffle_write_test(1000, 100, 1, None, true); + shuffle_write_test(10000, 10, 1, None, true); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_sort_based_insert_larger_batch() { + shuffle_write_test(10000, 1, 16, None, true); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_sort_based_insert_smaller_batch() { + shuffle_write_test(1000, 1, 16, None, true); + shuffle_write_test(1000, 10, 16, None, true); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_sort_based_large_number_of_partitions() { + shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024), true); + shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024), true); } #[tokio::test] @@ -403,6 +449,7 @@ mod test { num_batches: usize, num_partitions: usize, memory_limit: Option, + sort_based: bool, ) { let batch = create_batch(batch_size); @@ -467,6 +514,7 @@ mod test { "/tmp/index.out".to_string(), false, 1024 * 1024, // write_buffer_size: 1MB default + sort_based, ) .unwrap(); @@ -526,6 +574,7 @@ mod test { index_file.clone(), false, 1024 * 1024, + false, ) .unwrap(); @@ -730,6 +779,7 @@ mod test { index_file.to_str().unwrap().to_string(), false, 1024 * 1024, + false, ) .unwrap(); @@ -818,6 +868,7 @@ mod test { index_file.to_str().unwrap().to_string(), false, 1024 * 1024, + false, ) .unwrap(); From dd3cfb3e6cb6d71945884f17bead35e4ca5f0563 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 17:37:31 -0600 Subject: [PATCH 3/7] feat: add sort mode to shuffle benchmark --- native/shuffle/src/bin/shuffle_bench.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/native/shuffle/src/bin/shuffle_bench.rs b/native/shuffle/src/bin/shuffle_bench.rs index 9ea5e75c03..e7856a6a64 100644 --- a/native/shuffle/src/bin/shuffle_bench.rs +++ b/native/shuffle/src/bin/shuffle_bench.rs @@ -114,6 +114,11 @@ struct Args { /// Each task reads the same input and writes to its own output files. #[arg(long, default_value_t = 1)] concurrent_tasks: usize, + + /// Shuffle mode: 'buffered' buffers all rows before writing (default), + /// 'sort' sorts each batch by partition ID and writes immediately. + #[arg(long, default_value = "buffered")] + mode: String, } fn main() { @@ -141,6 +146,7 @@ fn main() { println!("Partitioning: {}", args.partitioning); println!("Partitions: {}", args.partitions); println!("Codec: {:?}", codec); + println!("Mode: {}", args.mode); println!("Hash columns: {:?}", hash_col_indices); if let Some(mem_limit) = args.memory_limit { println!("Memory limit: {}", format_bytes(mem_limit)); @@ -403,6 +409,7 @@ fn run_shuffle_write( let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let start = Instant::now(); + let sort_based = args.mode == "sort"; let (shuffle_metrics, input_metrics) = execute_shuffle_write( input_path.to_str().unwrap(), codec.clone(), @@ -413,6 +420,7 @@ fn run_shuffle_write( args.limit, data_file.to_string(), index_file.to_string(), + sort_based, ) .await .unwrap(); @@ -436,6 +444,7 @@ async fn execute_shuffle_write( limit: usize, data_file: String, index_file: String, + sort_based: bool, ) -> datafusion::common::Result<(MetricsSet, MetricsSet)> { let config = SessionConfig::new().with_batch_size(batch_size); let mut runtime_builder = RuntimeEnvBuilder::new(); @@ -477,7 +486,7 @@ async fn execute_shuffle_write( index_file, false, write_buffer_size, - false, + sort_based, ) .expect("Failed to create ShuffleWriterExec"); @@ -542,6 +551,7 @@ fn run_concurrent_shuffle_writes( let memory_limit = args.memory_limit; let write_buffer_size = args.write_buffer_size; let limit = args.limit; + let sort_based = args.mode == "sort"; handles.push(tokio::spawn(async move { execute_shuffle_write( @@ -554,6 +564,7 @@ fn run_concurrent_shuffle_writes( limit, data_file, index_file, + sort_based, ) .await .unwrap() From 4860bf2ae27906722f4e80282212f52e740e1e53 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 17:38:49 -0600 Subject: [PATCH 4/7] feat: wire up sort_based shuffle config from JVM to native Add COMET_SHUFFLE_SORT_BASED config option and pass it through protobuf to the native ShuffleWriterExec, replacing the hardcoded `false` value. --- common/src/main/scala/org/apache/comet/CometConf.scala | 10 ++++++++++ native/core/src/execution/planner.rs | 3 ++- native/proto/src/proto/operator.proto | 4 ++++ .../execution/shuffle/CometNativeShuffleWriter.scala | 1 + 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..fa7f081f17 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -534,6 +534,16 @@ object CometConf extends ShimCometConf { .checkValue(v => v > 0, "Write buffer size must be positive") .createWithDefault(1) + val COMET_SHUFFLE_SORT_BASED: ConfigEntry[Boolean] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.sort_based") + .category(CATEGORY_SHUFFLE) + .doc( + "When enabled, uses sort-based repartitioning for native shuffle. " + + "This avoids per-partition memory overhead from builders, making it more " + + "memory-efficient for large partition counts. Default is false (uses buffered mode).") + .booleanConf + .createWithDefault(false) + val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf( "spark.comet.shuffle.preferDictionary.ratio") .category(CATEGORY_SHUFFLE) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b31c21f0d9..8d2dc58f60 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1371,6 +1371,7 @@ impl PhysicalPlanner { }?; let write_buffer_size = writer.write_buffer_size as usize; + let sort_based = writer.sort_based; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( Arc::clone(&child.native_plan), partitioning, @@ -1379,7 +1380,7 @@ impl PhysicalPlanner { writer.output_index_file.clone(), writer.tracing_enabled, write_buffer_size, - false, + sort_based, )?); Ok(( diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index fb438b26a4..9d2b29c1b8 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -294,6 +294,10 @@ message ShuffleWriter { // Size of the write buffer in bytes used when writing shuffle data to disk. // Larger values may improve write performance but use more memory. int32 write_buffer_size = 8; + // When true, uses sort-based repartitioning for native shuffle. + // This avoids per-partition memory overhead from builders, making it more + // memory-efficient for large partition counts. + bool sort_based = 9; } message ParquetWriter { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index f27d021ac4..0d7091eae0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -192,6 +192,7 @@ class CometNativeShuffleWriter[K, V]( CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get) shuffleWriterBuilder.setWriteBufferSize( CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().min(Int.MaxValue).toInt) + shuffleWriterBuilder.setSortBased(CometConf.COMET_SHUFFLE_SORT_BASED.get()) outputPartitioning match { case p if isSinglePartitioning(p) => From e47a5fd371d2affbf54b31a7c09e74889e54bc6b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 17:40:35 -0600 Subject: [PATCH 5/7] chore: fix clippy warnings and format --- benchmarks/generate_charts.py | 75 ++ benchmarks/run_shuffle_bench.sh | 49 ++ .../2026-04-13-sort-based-repartitioning.md | 767 ++++++++++++++++++ native/shuffle/src/partitioners/sort_based.rs | 9 +- 4 files changed, 894 insertions(+), 6 deletions(-) create mode 100644 benchmarks/generate_charts.py create mode 100755 benchmarks/run_shuffle_bench.sh create mode 100644 docs/superpowers/plans/2026-04-13-sort-based-repartitioning.md diff --git a/benchmarks/generate_charts.py b/benchmarks/generate_charts.py new file mode 100644 index 0000000000..bea02765e6 --- /dev/null +++ b/benchmarks/generate_charts.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +"""Generate peak memory and throughput charts from shuffle benchmark results.""" + +import csv +import matplotlib.pyplot as plt +import numpy as np + +CSV_FILE = "/tmp/shuffle_bench_results_partitions/results.csv" +OUTPUT_DIR = "/tmp/shuffle_bench_results_partitions" + +rows = [] +with open(CSV_FILE) as f: + reader = csv.DictReader(f) + for row in reader: + rows.append(row) + +labels = [] +buffered_mem = [] +immediate_mem = [] +buffered_tp = [] +immediate_tp = [] + +for row in rows: + if row["mode"] == "buffered": + labels.append(str(row["partitions"])) + buffered_mem.append(int(row["peak_memory_bytes"]) / (1024**3)) + buffered_tp.append(int(row["throughput_rows_per_sec"]) / 1e6) + +for row in rows: + if row["mode"] == "immediate": + immediate_mem.append(int(row["peak_memory_bytes"]) / (1024**3)) + immediate_tp.append(int(row["throughput_rows_per_sec"]) / 1e6) + +x = np.arange(len(labels)) +width = 0.35 + +plt.rcParams.update({ + "figure.facecolor": "white", + "axes.facecolor": "white", + "axes.grid": True, + "grid.alpha": 0.3, + "font.size": 12, +}) + +# Chart 1: Peak Memory +fig, ax = plt.subplots(figsize=(10, 6)) +bars1 = ax.bar(x - width/2, buffered_mem, width, label="buffered", color="#4C78A8") +bars2 = ax.bar(x + width/2, immediate_mem, width, label="immediate", color="#E45756") +ax.set_xlabel("Output Partitions") +ax.set_ylabel("Peak RSS (GiB)") +ax.set_title("Shuffle Peak Memory Usage\n(4 GB memory limit, 100M rows, lz4, hash on cols 0,3)") +ax.set_xticks(x) +ax.set_xticklabels(labels) +ax.legend() +ax.bar_label(bars1, fmt="%.1f", padding=3, fontsize=9) +ax.bar_label(bars2, fmt="%.1f", padding=3, fontsize=9) +fig.tight_layout() +fig.savefig(f"{OUTPUT_DIR}/shuffle_peak_memory.png", dpi=150) +print(f"Saved {OUTPUT_DIR}/shuffle_peak_memory.png") + +# Chart 2: Throughput +fig, ax = plt.subplots(figsize=(10, 6)) +bars1 = ax.bar(x - width/2, buffered_tp, width, label="buffered", color="#4C78A8") +bars2 = ax.bar(x + width/2, immediate_tp, width, label="immediate", color="#E45756") +ax.set_xlabel("Output Partitions") +ax.set_ylabel("Throughput (M rows/s)") +ax.set_title("Shuffle Throughput\n(4 GB memory limit, 100M rows, lz4, hash on cols 0,3)") +ax.set_xticks(x) +ax.set_xticklabels(labels) +ax.legend() +ax.bar_label(bars1, fmt="%.2f", padding=3, fontsize=9) +ax.bar_label(bars2, fmt="%.2f", padding=3, fontsize=9) +fig.tight_layout() +fig.savefig(f"{OUTPUT_DIR}/shuffle_throughput.png", dpi=150) +print(f"Saved {OUTPUT_DIR}/shuffle_throughput.png") diff --git a/benchmarks/run_shuffle_bench.sh b/benchmarks/run_shuffle_bench.sh new file mode 100755 index 0000000000..d3952343b1 --- /dev/null +++ b/benchmarks/run_shuffle_bench.sh @@ -0,0 +1,49 @@ +#!/bin/bash +# Run shuffle benchmarks: buffered vs immediate, varying partition counts, 4GB memory limit + +set -e + +BENCH_BIN="./native/target/release/shuffle_bench" +INPUT="/opt/tpch/sf100/lineitem" +LIMIT=100000000 +CODEC="lz4" +HASH_COLUMNS="0,3" +MEMORY_LIMIT=4294000000 +RESULTS_DIR="/tmp/shuffle_bench_results_partitions" + +mkdir -p "$RESULTS_DIR" + +PARTITIONS=(200 400 800) +MODES=("buffered" "immediate") + +CSV_FILE="$RESULTS_DIR/results.csv" +echo "mode,partitions,peak_memory_bytes,throughput_rows_per_sec,avg_time_secs" > "$CSV_FILE" + +for mode in "${MODES[@]}"; do + for parts in "${PARTITIONS[@]}"; do + echo "=== Running: mode=$mode partitions=$parts ===" + OUT_FILE="$RESULTS_DIR/${mode}_${parts}.txt" + TIME_FILE="$RESULTS_DIR/${mode}_${parts}_time.txt" + + /usr/bin/time -l "$BENCH_BIN" \ + --input "$INPUT" \ + --limit "$LIMIT" \ + --partitions "$parts" \ + --codec "$CODEC" \ + --hash-columns "$HASH_COLUMNS" \ + --memory-limit "$MEMORY_LIMIT" \ + --mode "$mode" \ + > "$OUT_FILE" 2> "$TIME_FILE" + + peak_mem=$(grep "maximum resident set size" "$TIME_FILE" | awk '{print $1}') + throughput=$(grep "throughput:" "$OUT_FILE" | sed 's/.*throughput: *//; s/ rows\/s.*//' | tr -d ',') + avg_time=$(grep "avg time:" "$OUT_FILE" | sed 's/.*avg time: *//; s/s$//') + + echo " Peak memory: $peak_mem bytes, Throughput: $throughput rows/s, Avg time: ${avg_time}s" + echo "$mode,$parts,$peak_mem,$throughput,$avg_time" >> "$CSV_FILE" + done +done + +echo "" +echo "Results saved to $CSV_FILE" +cat "$CSV_FILE" diff --git a/docs/superpowers/plans/2026-04-13-sort-based-repartitioning.md b/docs/superpowers/plans/2026-04-13-sort-based-repartitioning.md new file mode 100644 index 0000000000..8032656bbc --- /dev/null +++ b/docs/superpowers/plans/2026-04-13-sort-based-repartitioning.md @@ -0,0 +1,767 @@ +# Sort-Based Shuffle Repartitioning Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a sort-based shuffle repartitioner that partitions each batch on-the-fly using counting sort + Arrow `take`/`slice`, avoiding per-partition builder memory overhead. + +**Architecture:** A new `SortBasedPartitioner` implements `ShufflePartitioner`. On each `insert_batch`, it computes partition IDs, counting-sorts the batch by partition, slices it into per-partition sub-batches, and writes compressed IPC blocks to per-partition `PartitionWriter`s. Memory usage is O(batch_size) not O(batch_size x num_partitions). On `shuffle_write`, it concatenates per-partition spill files into the final data+index files. + +**Tech Stack:** Rust, Arrow (`arrow::compute::take`), existing shuffle infrastructure (`ShuffleBlockWriter`, `PartitionWriter`, `BufBatchWriter`) + +--- + +### Task 1: Create `SortBasedPartitioner` struct and constructor + +**Files:** +- Create: `native/shuffle/src/partitioners/sort_based.rs` +- Modify: `native/shuffle/src/partitioners/mod.rs` + +- [ ] **Step 1: Create the module file with struct definition** + +Create `native/shuffle/src/partitioners/sort_based.rs`: + +```rust +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::ShufflePartitioner; +use crate::writers::PartitionWriter; +use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; +use arrow::array::{ArrayRef, RecordBatch, UInt32Array}; +use arrow::compute::take; +use arrow::datatypes::SchemaRef; +use datafusion::common::DataFusionError; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_plan::metrics::Time; +use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; +use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Seek, Write}; +use std::sync::Arc; +use tokio::time::Instant; + +/// A shuffle repartitioner that sorts each batch by partition ID using counting sort, +/// then slices and writes per-partition sub-batches immediately. This avoids +/// per-partition Arrow builders, so memory usage is O(batch_size) regardless of +/// partition count. +pub(crate) struct SortBasedPartitioner { + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + /// One writer per output partition (manages spill files) + partition_writers: Vec, + shuffle_block_writer: ShuffleBlockWriter, + partitioning: CometPartitioning, + runtime: Arc, + metrics: ShufflePartitionerMetrics, + /// The configured batch size + batch_size: usize, + /// Memory reservation for tracking + reservation: MemoryReservation, + /// Size of the write buffer in bytes + write_buffer_size: usize, + /// Reusable scratch buffers + hashes_buf: Vec, + partition_ids: Vec, + sorted_indices: Vec, + /// Partition boundary offsets: partition_starts[i]..partition_starts[i+1] + /// gives the range in sorted_indices for partition i + partition_starts: Vec, +} +``` + +- [ ] **Step 2: Implement constructor** + +Add to the same file: + +```rust +impl SortBasedPartitioner { + #[allow(clippy::too_many_arguments)] + pub(crate) fn try_new( + partition: usize, + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + partitioning: CometPartitioning, + metrics: ShufflePartitionerMetrics, + runtime: Arc, + batch_size: usize, + codec: CompressionCodec, + write_buffer_size: usize, + ) -> datafusion::common::Result { + let num_output_partitions = partitioning.partition_count(); + + let shuffle_block_writer = + ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; + + let partition_writers = (0..num_output_partitions) + .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) + .collect::>>()?; + + let reservation = + MemoryConsumer::new(format!("SortBasedPartitioner[{partition}]")) + .register(&runtime.memory_pool); + + let hashes_buf = match partitioning { + CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => { + vec![0u32; batch_size] + } + _ => vec![], + }; + + Ok(Self { + output_data_file, + output_index_file, + schema, + partition_writers, + shuffle_block_writer, + partitioning, + runtime, + metrics, + batch_size, + reservation, + write_buffer_size, + hashes_buf, + partition_ids: vec![0u32; batch_size], + sorted_indices: vec![0u32; batch_size], + partition_starts: vec![0usize; num_output_partitions + 1], + }) + } +} +``` + +- [ ] **Step 3: Register module in mod.rs** + +Modify `native/shuffle/src/partitioners/mod.rs` — add after `mod single_partition;`: + +```rust +mod sort_based; +``` + +And add to the pub use section: + +```rust +pub(crate) use sort_based::SortBasedPartitioner; +``` + +- [ ] **Step 4: Verify it compiles** + +Run: `cargo build --manifest-path native/Cargo.toml 2>&1 | tail -5` +Expected: Compiles (with dead code warnings, which is fine) + +- [ ] **Step 5: Commit** + +```bash +git add native/shuffle/src/partitioners/sort_based.rs native/shuffle/src/partitioners/mod.rs +git commit -m "feat: add SortBasedPartitioner struct and constructor" +``` + +--- + +### Task 2: Implement partition ID computation and counting sort + +**Files:** +- Modify: `native/shuffle/src/partitioners/sort_based.rs` + +- [ ] **Step 1: Add the counting sort method** + +This method computes partition IDs for each row, then uses counting sort to produce sorted indices and partition boundary offsets. Add to the `impl SortBasedPartitioner` block: + +```rust + /// Compute partition IDs for each row and counting-sort the indices by partition. + /// After this call: + /// - `self.sorted_indices[self.partition_starts[p]..self.partition_starts[p+1]]` + /// contains the row indices belonging to partition `p`. + fn compute_partition_ids_and_sort( + &mut self, + input: &RecordBatch, + ) -> datafusion::common::Result<()> { + let num_rows = input.num_rows(); + let num_partitions = self.partitioning.partition_count(); + + match &self.partitioning { + CometPartitioning::Hash(exprs, num_output_partitions) => { + let arrays = exprs + .iter() + .map(|expr| expr.evaluate(input)?.into_array(num_rows)) + .collect::>>()?; + + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + create_murmur3_hashes(&arrays, hashes_buf)?; + + let partition_ids = &mut self.partition_ids[..num_rows]; + for (idx, hash) in hashes_buf.iter().enumerate() { + partition_ids[idx] = + comet_partitioning::pmod(*hash, *num_output_partitions) as u32; + } + } + CometPartitioning::RangePartitioning( + lex_ordering, + _num_output_partitions, + row_converter, + bounds, + ) => { + let arrays = lex_ordering + .iter() + .map(|expr| expr.expr.evaluate(input)?.into_array(num_rows)) + .collect::>>()?; + + let row_batch = row_converter.convert_columns(arrays.as_slice())?; + let partition_ids = &mut self.partition_ids[..num_rows]; + row_batch.iter().enumerate().for_each(|(row_idx, row)| { + partition_ids[row_idx] = bounds + .as_slice() + .partition_point(|bound| bound.row() <= row) + as u32; + }); + } + CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { + let num_columns_to_hash = if *max_hash_columns == 0 { + input.num_columns() + } else { + (*max_hash_columns).min(input.num_columns()) + }; + let columns_to_hash: Vec = (0..num_columns_to_hash) + .map(|i| Arc::clone(input.column(i))) + .collect(); + + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + create_murmur3_hashes(&columns_to_hash, hashes_buf)?; + + let partition_ids = &mut self.partition_ids[..num_rows]; + for (idx, hash) in hashes_buf.iter().enumerate() { + partition_ids[idx] = + comet_partitioning::pmod(*hash, *num_output_partitions) as u32; + } + } + other => { + return Err(DataFusionError::NotImplemented(format!( + "Unsupported shuffle partitioning scheme {other:?}" + ))); + } + } + + // Counting sort: count rows per partition + let partition_starts = &mut self.partition_starts[..num_partitions + 1]; + partition_starts.fill(0); + let partition_ids = &self.partition_ids[..num_rows]; + for &pid in partition_ids.iter() { + partition_starts[pid as usize + 1] += 1; + } + + // Prefix sum to get start offsets + for i in 1..=num_partitions { + partition_starts[i] += partition_starts[i - 1]; + } + + // Place indices into sorted order + let sorted_indices = &mut self.sorted_indices[..num_rows]; + // We need a mutable copy of the starts to use as cursors + // Use partition_starts as-is, advancing each partition's cursor + let mut cursors = partition_starts.to_vec(); + for (row_idx, &pid) in partition_ids.iter().enumerate() { + let pos = cursors[pid as usize]; + sorted_indices[pos] = row_idx as u32; + cursors[pid as usize] += 1; + } + + Ok(()) + } +``` + +- [ ] **Step 2: Verify it compiles** + +Run: `cargo build --manifest-path native/Cargo.toml 2>&1 | tail -5` +Expected: Compiles + +- [ ] **Step 3: Commit** + +```bash +git add native/shuffle/src/partitioners/sort_based.rs +git commit -m "feat: add partition ID computation and counting sort" +``` + +--- + +### Task 3: Implement `insert_batch` — sort, slice, and write per-partition + +**Files:** +- Modify: `native/shuffle/src/partitioners/sort_based.rs` + +- [ ] **Step 1: Add the process_batch method** + +This is the core method that sorts a single batch by partition and writes each partition's slice. Add to the `impl SortBasedPartitioner` block: + +```rust + /// Process a single batch: compute partition IDs, sort, slice by partition, + /// and write each slice to the corresponding partition writer. + fn process_batch(&mut self, input: RecordBatch) -> datafusion::common::Result<()> { + if input.num_rows() == 0 { + return Ok(()); + } + + let num_rows = input.num_rows(); + let num_partitions = self.partitioning.partition_count(); + + // Update metrics + self.metrics.data_size.add(input.get_array_memory_size()); + self.metrics.baseline.record_output(num_rows); + + // Phase 1: Compute partition IDs and counting-sort indices + { + let mut timer = self.metrics.repart_time.timer(); + self.compute_partition_ids_and_sort(&input)?; + timer.stop(); + } + + // Phase 2: Reorder the batch using take, then slice by partition boundaries + let sorted_indices = &self.sorted_indices[..num_rows]; + let partition_starts = &self.partition_starts[..num_partitions + 1]; + + let indices_array = UInt32Array::from_iter_values(sorted_indices.iter().copied()); + let sorted_batch = RecordBatch::try_new( + input.schema(), + input + .columns() + .iter() + .map(|col| take(col, &indices_array, None)) + .collect::, _>>()?, + )?; + + // Phase 3: Slice the sorted batch at partition boundaries and write each slice + for partition_id in 0..num_partitions { + let start = partition_starts[partition_id]; + let end = partition_starts[partition_id + 1]; + let len = end - start; + if len == 0 { + continue; + } + + let partition_batch = sorted_batch.slice(start, len); + self.partition_writers[partition_id].spill_batch( + &partition_batch, + &self.runtime, + &self.metrics, + self.write_buffer_size, + self.batch_size, + )?; + } + + Ok(()) + } +``` + +- [ ] **Step 2: Add `spill_batch` method to `PartitionWriter`** + +Modify `native/shuffle/src/writers/spill.rs` to add a method that writes a single batch (not an iterator). Add after the existing `spill` method: + +```rust + /// Write a single batch to this partition's spill file. + pub(crate) fn spill_batch( + &mut self, + batch: &RecordBatch, + runtime: &RuntimeEnv, + metrics: &ShufflePartitionerMetrics, + write_buffer_size: usize, + batch_size: usize, + ) -> datafusion::common::Result { + if batch.num_rows() == 0 { + return Ok(0); + } + self.ensure_spill_file_created(runtime)?; + + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut self.spill_file.as_mut().unwrap().file, + write_buffer_size, + batch_size, + ); + let bytes_written = + buf_batch_writer.write(batch, &metrics.encode_time, &metrics.write_time)?; + buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; + Ok(bytes_written) + } +``` + +- [ ] **Step 3: Implement the `ShufflePartitioner` trait** + +Add to `sort_based.rs`: + +```rust +#[async_trait::async_trait] +impl ShufflePartitioner for SortBasedPartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + let mut start = 0; + while start < batch.num_rows() { + let end = (start + self.batch_size).min(batch.num_rows()); + let slice = batch.slice(start, end - start); + self.process_batch(slice)?; + start = end; + } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } + + fn shuffle_write(&mut self) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + let num_output_partitions = self.partition_writers.len(); + let mut offsets = vec![0i64; num_output_partitions + 1]; + + let data_file = self.output_data_file.clone(); + let index_file = self.output_index_file.clone(); + + let output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + + let mut output_data = BufWriter::new(output_data); + + for i in 0..num_output_partitions { + offsets[i] = output_data.stream_position()? as i64; + + // Copy spill file contents to final data file + if let Some(spill_path) = self.partition_writers[i].path() { + let mut spill_file = File::open(spill_path)?; + let mut write_timer = self.metrics.write_time.timer(); + std::io::copy(&mut spill_file, &mut output_data)?; + write_timer.stop(); + } + } + + let mut write_timer = self.metrics.write_time.timer(); + output_data.flush()?; + write_timer.stop(); + + offsets[num_output_partitions] = output_data.stream_position()? as i64; + + // Write index file + let mut write_timer = self.metrics.write_time.timer(); + let mut output_index = + BufWriter::new(File::create(index_file).map_err(|e| { + DataFusionError::Execution(format!("shuffle write error: {e:?}")) + })?); + for offset in offsets { + output_index.write_all(&offset.to_le_bytes()[..])?; + } + output_index.flush()?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + + Ok(()) + } +} + +impl Debug for SortBasedPartitioner { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("SortBasedPartitioner") + .field("partitions", &self.partition_writers.len()) + .finish() + } +} +``` + +- [ ] **Step 4: Verify it compiles** + +Run: `cargo build --manifest-path native/Cargo.toml 2>&1 | tail -5` +Expected: Compiles + +- [ ] **Step 5: Commit** + +```bash +git add native/shuffle/src/partitioners/sort_based.rs native/shuffle/src/writers/spill.rs +git commit -m "feat: implement insert_batch and shuffle_write for SortBasedPartitioner" +``` + +--- + +### Task 4: Wire up `SortBasedPartitioner` in the shuffle writer + +**Files:** +- Modify: `native/shuffle/src/shuffle_writer.rs` + +- [ ] **Step 1: Add sort-based mode to the shuffle_write function** + +In `native/shuffle/src/shuffle_writer.rs`, the `shuffle_write` function (line ~196) currently constructs a `MultiPartitionShuffleRepartitioner` as the default for multi-partition cases. We need to add a `sort_based` boolean parameter and use `SortBasedPartitioner` when true. + +First, update the `ShuffleWriterExec` struct to include the new flag. Add field after `write_buffer_size`: + +```rust + /// Whether to use sort-based repartitioning + sort_based: bool, +``` + +Update `try_new` to accept the new parameter (add `sort_based: bool` as the last parameter) and store it in the struct. + +Update `with_children` to pass `self.sort_based` through. + +Then modify the `shuffle_write` function signature to add `sort_based: bool` parameter, and update the match arm for multi-partition: + +```rust + _ if sort_based => Box::new(SortBasedPartitioner::try_new( + partition, + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning, + metrics, + context.runtime_env(), + context.session_config().batch_size(), + codec, + write_buffer_size, + )?), + _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( +``` + +Add the import at the top of the file: + +```rust +use crate::partitioners::SortBasedPartitioner; +``` + +- [ ] **Step 2: Update existing tests to pass `sort_based: false`** + +All existing `ShuffleWriterExec::try_new` calls in tests need the new `false` parameter added. + +- [ ] **Step 3: Verify it compiles and existing tests pass** + +Run: `cargo test --manifest-path native/Cargo.toml -p datafusion-comet-shuffle 2>&1 | tail -20` +Expected: All existing tests pass + +- [ ] **Step 4: Commit** + +```bash +git add native/shuffle/src/shuffle_writer.rs +git commit -m "feat: wire SortBasedPartitioner into shuffle writer" +``` + +--- + +### Task 5: Add tests for sort-based partitioner + +**Files:** +- Modify: `native/shuffle/src/shuffle_writer.rs` (test module) + +- [ ] **Step 1: Create a parameterized test helper** + +Add a new test helper that runs the existing `shuffle_write_test` logic but with `sort_based: true`. The simplest approach is to parameterize the existing `shuffle_write_test` function: + +Rename the existing `shuffle_write_test` to accept a `sort_based` parameter: + +```rust + fn shuffle_write_test( + batch_size: usize, + num_batches: usize, + num_partitions: usize, + memory_limit: Option, + sort_based: bool, + ) { +``` + +And pass it through to `ShuffleWriterExec::try_new`. + +Update all existing callers to pass `false`. + +- [ ] **Step 2: Add sort-based test cases** + +Add new tests that mirror the existing ones but with `sort_based: true`: + +```rust + #[test] + #[cfg_attr(miri, ignore)] + fn test_sort_based_basic() { + shuffle_write_test(1000, 100, 1, None, true); + shuffle_write_test(10000, 10, 1, None, true); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_sort_based_insert_larger_batch() { + shuffle_write_test(10000, 1, 16, true); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_sort_based_insert_smaller_batch() { + shuffle_write_test(1000, 1, 16, None, true); + shuffle_write_test(1000, 10, 16, None, true); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_sort_based_large_number_of_partitions() { + shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024), true); + shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024), true); + } +``` + +- [ ] **Step 3: Run the tests** + +Run: `cargo test --manifest-path native/Cargo.toml -p datafusion-comet-shuffle 2>&1 | tail -20` +Expected: All tests pass (both existing buffered and new sort-based) + +- [ ] **Step 4: Commit** + +```bash +git add native/shuffle/src/shuffle_writer.rs +git commit -m "test: add sort-based partitioner tests" +``` + +--- + +### Task 6: Add sort-based mode to benchmark binary + +**Files:** +- Modify: `native/shuffle/src/bin/shuffle_bench.rs` + +- [ ] **Step 1: Update benchmark to support sort-based mode** + +The existing benchmark already has a `--mode` CLI arg. Add `"sort"` as a new mode option. Find where mode is parsed and `ShuffleWriterExec` is created, and pass `sort_based: true` when mode is `"sort"`. + +Search for where `ShuffleWriterExec::try_new` is called in the benchmark file and add the `sort_based` parameter: + +```rust +let sort_based = args.mode == "sort"; +``` + +Pass `sort_based` to `ShuffleWriterExec::try_new`. + +- [ ] **Step 2: Verify it compiles** + +Run: `cargo build --release --features shuffle-bench --bin shuffle_bench --manifest-path native/Cargo.toml 2>&1 | tail -5` +Expected: Compiles + +- [ ] **Step 3: Commit** + +```bash +git add native/shuffle/src/bin/shuffle_bench.rs +git commit -m "feat: add sort mode to shuffle benchmark" +``` + +--- + +### Task 7: Wire up JVM side — add config and pass through to native + +**Files:** +- Modify: `common/src/main/java/org/apache/comet/CometConf.scala` (add config) +- Modify: `spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala` (pass config) +- Modify: `native/core/src/execution/planner.rs` (receive and forward) + +- [ ] **Step 1: Add config option in CometConf.scala** + +Find the existing shuffle-related configs (search for `SHUFFLE` or `shuffle`). Add a new config: + +```scala + val COMET_SHUFFLE_SORT_BASED: ConfigEntry[Boolean] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.sort_based") + .doc( + "When enabled, uses sort-based repartitioning for native shuffle. " + + "This avoids per-partition memory overhead from builders, making it more " + + "memory-efficient for large partition counts. Default is false (uses buffered mode).") + .booleanConf + .createWithDefault(false) +``` + +- [ ] **Step 2: Pass config through CometNativeShuffleWriter** + +In `CometNativeShuffleWriter.scala`, find where `ShuffleWriterExec` parameters are passed to native via protobuf or JNI. Add `sort_based` to the serialized plan. + +- [ ] **Step 3: Receive in planner.rs** + +In `native/core/src/execution/planner.rs`, find where `ShuffleWriterExec` is constructed. Extract the `sort_based` field from the protobuf and pass it to `ShuffleWriterExec::try_new`. + +- [ ] **Step 4: Update protobuf if needed** + +If the shuffle writer parameters are passed via protobuf (`native/proto/src/`), add a `sort_based` field to the relevant message. + +- [ ] **Step 5: Build and verify** + +Run: `make core` +Expected: Compiles + +- [ ] **Step 6: Commit** + +```bash +git add common/src/main/java/org/apache/comet/CometConf.scala \ + spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala \ + native/core/src/execution/planner.rs +git commit -m "feat: add sort_based shuffle config and JVM-native plumbing" +``` + +--- + +### Task 8: Run clippy and format + +**Files:** +- Potentially any Rust files modified above + +- [ ] **Step 1: Run cargo fmt** + +Run: `cd native && cargo fmt` + +- [ ] **Step 2: Run clippy** + +Run: `cd native && cargo clippy --all-targets --workspace -- -D warnings 2>&1 | tail -20` +Expected: No warnings + +- [ ] **Step 3: Fix any issues and commit** + +```bash +git add -A +git commit -m "chore: fix formatting and clippy warnings" +``` + +--- + +### Task 9: Run full test suite and benchmarks + +**Files:** None (verification only) + +- [ ] **Step 1: Run Rust tests** + +Run: `cargo test --manifest-path native/Cargo.toml -p datafusion-comet-shuffle` +Expected: All tests pass + +- [ ] **Step 2: Run benchmark comparison** + +Run the shuffle benchmark with all three modes and compare: + +```bash +./native/target/release/shuffle_bench \ + --input /opt/tpch/sf100/lineitem \ + --limit 100000000 \ + --partitions 200 \ + --codec lz4 \ + --hash-columns 0,3 \ + --memory-limit 4294000000 \ + --mode sort + +./native/target/release/shuffle_bench \ + --input /opt/tpch/sf100/lineitem \ + --limit 100000000 \ + --partitions 800 \ + --codec lz4 \ + --hash-columns 0,3 \ + --memory-limit 4294000000 \ + --mode sort +``` + +Compare throughput and peak memory against `buffered` mode. + +- [ ] **Step 3: Run JVM tests** + +Run: `make && ./mvnw test -DwildcardSuites="CometNativeShuffle"` +Expected: All shuffle tests pass diff --git a/native/shuffle/src/partitioners/sort_based.rs b/native/shuffle/src/partitioners/sort_based.rs index e0322ef796..dee146f0c6 100644 --- a/native/shuffle/src/partitioners/sort_based.rs +++ b/native/shuffle/src/partitioners/sort_based.rs @@ -40,14 +40,13 @@ use tokio::time::Instant; pub(crate) struct SortBasedPartitioner { output_data_file: String, output_index_file: String, - schema: SchemaRef, partition_writers: Vec, - shuffle_block_writer: ShuffleBlockWriter, partitioning: CometPartitioning, runtime: Arc, metrics: ShufflePartitionerMetrics, batch_size: usize, - reservation: MemoryReservation, + /// Held to keep the memory reservation alive for the lifetime of this partitioner + _reservation: MemoryReservation, write_buffer_size: usize, hashes_buf: Vec, partition_ids: Vec, @@ -85,14 +84,12 @@ impl SortBasedPartitioner { Ok(Self { output_data_file, output_index_file, - schema, partition_writers, - shuffle_block_writer, partitioning, runtime, metrics, batch_size, - reservation, + _reservation: reservation, write_buffer_size, hashes_buf, partition_ids: vec![0u32; batch_size], From 7f17efa40090abd34b8ee348196ff6452dbf4cb5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 17:52:47 -0600 Subject: [PATCH 6/7] perf: use persistent BufBatchWriter per partition in SortBasedPartitioner Replace per-call BufBatchWriter creation with persistent per-partition writers that keep BatchCoalescer state across calls. This allows small partition slices to be coalesced to batch_size before encoding, dramatically reducing per-block IPC schema overhead. --- native/shuffle/src/partitioners/sort_based.rs | 113 +++++++++++++++++- 1 file changed, 107 insertions(+), 6 deletions(-) diff --git a/native/shuffle/src/partitioners/sort_based.rs b/native/shuffle/src/partitioners/sort_based.rs index dee146f0c6..dfd2d68ff4 100644 --- a/native/shuffle/src/partitioners/sort_based.rs +++ b/native/shuffle/src/partitioners/sort_based.rs @@ -17,12 +17,13 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; -use crate::writers::PartitionWriter; +use crate::writers::BufBatchWriter; use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use arrow::array::{ArrayRef, RecordBatch, UInt32Array}; use arrow::compute::take; use arrow::datatypes::SchemaRef; use datafusion::common::DataFusionError; +use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; @@ -33,14 +34,106 @@ use std::io::{BufWriter, Seek, Write}; use std::sync::Arc; use tokio::time::Instant; +/// Per-partition writer that owns a persistent BufBatchWriter with BatchCoalescer, +/// so small batches are accumulated to batch_size before encoding. +struct PartitionSpillWriter { + /// The BufBatchWriter that coalesces and encodes batches. + /// None until the first batch is written to this partition. + writer: Option>, + /// Temp file handle — kept alive so the file isn't deleted until we're done + _temp_file: Option, + /// Path to the spill file for copying in shuffle_write + spill_path: Option, +} + +impl PartitionSpillWriter { + fn new() -> Self { + Self { + writer: None, + _temp_file: None, + spill_path: None, + } + } + + fn ensure_writer( + &mut self, + runtime: &RuntimeEnv, + shuffle_block_writer: &ShuffleBlockWriter, + write_buffer_size: usize, + batch_size: usize, + ) -> datafusion::common::Result<()> { + if self.writer.is_none() { + let temp_file = runtime + .disk_manager + .create_tmp_file("sort shuffle spill")?; + let path = temp_file.path().to_path_buf(); + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&path) + .map_err(|e| { + DataFusionError::Execution(format!("Error creating spill file: {e}")) + })?; + self.writer = Some(BufBatchWriter::new( + shuffle_block_writer.clone(), + file, + write_buffer_size, + batch_size, + )); + self.spill_path = Some(path); + self._temp_file = Some(temp_file); + } + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn write_batch( + &mut self, + batch: &RecordBatch, + runtime: &RuntimeEnv, + shuffle_block_writer: &ShuffleBlockWriter, + write_buffer_size: usize, + batch_size: usize, + encode_time: &datafusion::physical_plan::metrics::Time, + write_time: &datafusion::physical_plan::metrics::Time, + ) -> datafusion::common::Result<()> { + self.ensure_writer(runtime, shuffle_block_writer, write_buffer_size, batch_size)?; + self.writer + .as_mut() + .unwrap() + .write(batch, encode_time, write_time)?; + Ok(()) + } + + fn flush( + &mut self, + encode_time: &datafusion::physical_plan::metrics::Time, + write_time: &datafusion::physical_plan::metrics::Time, + ) -> datafusion::common::Result<()> { + if let Some(writer) = &mut self.writer { + writer.flush(encode_time, write_time)?; + } + Ok(()) + } + + fn path(&self) -> Option<&std::path::Path> { + self.spill_path.as_deref() + } +} + /// A shuffle repartitioner that sorts each batch by partition ID using counting sort, /// then slices and writes per-partition sub-batches immediately. This avoids /// per-partition Arrow builders, so memory usage is O(batch_size) regardless of /// partition count. +/// +/// Each partition has a persistent BufBatchWriter with a BatchCoalescer that accumulates +/// small slices to batch_size before encoding, avoiding per-slice IPC schema overhead. pub(crate) struct SortBasedPartitioner { output_data_file: String, output_index_file: String, - partition_writers: Vec, + partition_writers: Vec, + shuffle_block_writer: ShuffleBlockWriter, partitioning: CometPartitioning, runtime: Arc, metrics: ShufflePartitionerMetrics, @@ -71,8 +164,8 @@ impl SortBasedPartitioner { let num_output_partitions = partitioning.partition_count(); let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; let partition_writers = (0..num_output_partitions) - .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) - .collect::>>()?; + .map(|_| PartitionSpillWriter::new()) + .collect(); let reservation = MemoryConsumer::new(format!("SortBasedPartitioner[{partition}]")) .register(&runtime.memory_pool); let hashes_buf = match partitioning { @@ -85,6 +178,7 @@ impl SortBasedPartitioner { output_data_file, output_index_file, partition_writers, + shuffle_block_writer, partitioning, runtime, metrics, @@ -222,12 +316,14 @@ impl SortBasedPartitioner { continue; } let partition_batch = sorted_batch.slice(start, len); - self.partition_writers[partition_id].spill_batch( + self.partition_writers[partition_id].write_batch( &partition_batch, &self.runtime, - &self.metrics, + &self.shuffle_block_writer, self.write_buffer_size, self.batch_size, + &self.metrics.encode_time, + &self.metrics.write_time, )?; } Ok(()) @@ -260,6 +356,11 @@ impl ShufflePartitioner for SortBasedPartitioner { let data_file = self.output_data_file.clone(); let index_file = self.output_index_file.clone(); + // Flush all partition writers to ensure all data is written to spill files + for writer in &mut self.partition_writers { + writer.flush(&self.metrics.encode_time, &self.metrics.write_time)?; + } + let output_data = OpenOptions::new() .write(true) .create(true) From 3817837dfc3067729ed70ab7b56c87c506754668 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 17:53:52 -0600 Subject: [PATCH 7/7] chore: remove benchmark scripts and design doc from PR --- benchmarks/generate_charts.py | 75 -- benchmarks/run_shuffle_bench.sh | 49 -- .../2026-04-13-sort-based-repartitioning.md | 767 ------------------ 3 files changed, 891 deletions(-) delete mode 100644 benchmarks/generate_charts.py delete mode 100755 benchmarks/run_shuffle_bench.sh delete mode 100644 docs/superpowers/plans/2026-04-13-sort-based-repartitioning.md diff --git a/benchmarks/generate_charts.py b/benchmarks/generate_charts.py deleted file mode 100644 index bea02765e6..0000000000 --- a/benchmarks/generate_charts.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python3 -"""Generate peak memory and throughput charts from shuffle benchmark results.""" - -import csv -import matplotlib.pyplot as plt -import numpy as np - -CSV_FILE = "/tmp/shuffle_bench_results_partitions/results.csv" -OUTPUT_DIR = "/tmp/shuffle_bench_results_partitions" - -rows = [] -with open(CSV_FILE) as f: - reader = csv.DictReader(f) - for row in reader: - rows.append(row) - -labels = [] -buffered_mem = [] -immediate_mem = [] -buffered_tp = [] -immediate_tp = [] - -for row in rows: - if row["mode"] == "buffered": - labels.append(str(row["partitions"])) - buffered_mem.append(int(row["peak_memory_bytes"]) / (1024**3)) - buffered_tp.append(int(row["throughput_rows_per_sec"]) / 1e6) - -for row in rows: - if row["mode"] == "immediate": - immediate_mem.append(int(row["peak_memory_bytes"]) / (1024**3)) - immediate_tp.append(int(row["throughput_rows_per_sec"]) / 1e6) - -x = np.arange(len(labels)) -width = 0.35 - -plt.rcParams.update({ - "figure.facecolor": "white", - "axes.facecolor": "white", - "axes.grid": True, - "grid.alpha": 0.3, - "font.size": 12, -}) - -# Chart 1: Peak Memory -fig, ax = plt.subplots(figsize=(10, 6)) -bars1 = ax.bar(x - width/2, buffered_mem, width, label="buffered", color="#4C78A8") -bars2 = ax.bar(x + width/2, immediate_mem, width, label="immediate", color="#E45756") -ax.set_xlabel("Output Partitions") -ax.set_ylabel("Peak RSS (GiB)") -ax.set_title("Shuffle Peak Memory Usage\n(4 GB memory limit, 100M rows, lz4, hash on cols 0,3)") -ax.set_xticks(x) -ax.set_xticklabels(labels) -ax.legend() -ax.bar_label(bars1, fmt="%.1f", padding=3, fontsize=9) -ax.bar_label(bars2, fmt="%.1f", padding=3, fontsize=9) -fig.tight_layout() -fig.savefig(f"{OUTPUT_DIR}/shuffle_peak_memory.png", dpi=150) -print(f"Saved {OUTPUT_DIR}/shuffle_peak_memory.png") - -# Chart 2: Throughput -fig, ax = plt.subplots(figsize=(10, 6)) -bars1 = ax.bar(x - width/2, buffered_tp, width, label="buffered", color="#4C78A8") -bars2 = ax.bar(x + width/2, immediate_tp, width, label="immediate", color="#E45756") -ax.set_xlabel("Output Partitions") -ax.set_ylabel("Throughput (M rows/s)") -ax.set_title("Shuffle Throughput\n(4 GB memory limit, 100M rows, lz4, hash on cols 0,3)") -ax.set_xticks(x) -ax.set_xticklabels(labels) -ax.legend() -ax.bar_label(bars1, fmt="%.2f", padding=3, fontsize=9) -ax.bar_label(bars2, fmt="%.2f", padding=3, fontsize=9) -fig.tight_layout() -fig.savefig(f"{OUTPUT_DIR}/shuffle_throughput.png", dpi=150) -print(f"Saved {OUTPUT_DIR}/shuffle_throughput.png") diff --git a/benchmarks/run_shuffle_bench.sh b/benchmarks/run_shuffle_bench.sh deleted file mode 100755 index d3952343b1..0000000000 --- a/benchmarks/run_shuffle_bench.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/bash -# Run shuffle benchmarks: buffered vs immediate, varying partition counts, 4GB memory limit - -set -e - -BENCH_BIN="./native/target/release/shuffle_bench" -INPUT="/opt/tpch/sf100/lineitem" -LIMIT=100000000 -CODEC="lz4" -HASH_COLUMNS="0,3" -MEMORY_LIMIT=4294000000 -RESULTS_DIR="/tmp/shuffle_bench_results_partitions" - -mkdir -p "$RESULTS_DIR" - -PARTITIONS=(200 400 800) -MODES=("buffered" "immediate") - -CSV_FILE="$RESULTS_DIR/results.csv" -echo "mode,partitions,peak_memory_bytes,throughput_rows_per_sec,avg_time_secs" > "$CSV_FILE" - -for mode in "${MODES[@]}"; do - for parts in "${PARTITIONS[@]}"; do - echo "=== Running: mode=$mode partitions=$parts ===" - OUT_FILE="$RESULTS_DIR/${mode}_${parts}.txt" - TIME_FILE="$RESULTS_DIR/${mode}_${parts}_time.txt" - - /usr/bin/time -l "$BENCH_BIN" \ - --input "$INPUT" \ - --limit "$LIMIT" \ - --partitions "$parts" \ - --codec "$CODEC" \ - --hash-columns "$HASH_COLUMNS" \ - --memory-limit "$MEMORY_LIMIT" \ - --mode "$mode" \ - > "$OUT_FILE" 2> "$TIME_FILE" - - peak_mem=$(grep "maximum resident set size" "$TIME_FILE" | awk '{print $1}') - throughput=$(grep "throughput:" "$OUT_FILE" | sed 's/.*throughput: *//; s/ rows\/s.*//' | tr -d ',') - avg_time=$(grep "avg time:" "$OUT_FILE" | sed 's/.*avg time: *//; s/s$//') - - echo " Peak memory: $peak_mem bytes, Throughput: $throughput rows/s, Avg time: ${avg_time}s" - echo "$mode,$parts,$peak_mem,$throughput,$avg_time" >> "$CSV_FILE" - done -done - -echo "" -echo "Results saved to $CSV_FILE" -cat "$CSV_FILE" diff --git a/docs/superpowers/plans/2026-04-13-sort-based-repartitioning.md b/docs/superpowers/plans/2026-04-13-sort-based-repartitioning.md deleted file mode 100644 index 8032656bbc..0000000000 --- a/docs/superpowers/plans/2026-04-13-sort-based-repartitioning.md +++ /dev/null @@ -1,767 +0,0 @@ -# Sort-Based Shuffle Repartitioning Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Add a sort-based shuffle repartitioner that partitions each batch on-the-fly using counting sort + Arrow `take`/`slice`, avoiding per-partition builder memory overhead. - -**Architecture:** A new `SortBasedPartitioner` implements `ShufflePartitioner`. On each `insert_batch`, it computes partition IDs, counting-sorts the batch by partition, slices it into per-partition sub-batches, and writes compressed IPC blocks to per-partition `PartitionWriter`s. Memory usage is O(batch_size) not O(batch_size x num_partitions). On `shuffle_write`, it concatenates per-partition spill files into the final data+index files. - -**Tech Stack:** Rust, Arrow (`arrow::compute::take`), existing shuffle infrastructure (`ShuffleBlockWriter`, `PartitionWriter`, `BufBatchWriter`) - ---- - -### Task 1: Create `SortBasedPartitioner` struct and constructor - -**Files:** -- Create: `native/shuffle/src/partitioners/sort_based.rs` -- Modify: `native/shuffle/src/partitioners/mod.rs` - -- [ ] **Step 1: Create the module file with struct definition** - -Create `native/shuffle/src/partitioners/sort_based.rs`: - -```rust -use crate::metrics::ShufflePartitionerMetrics; -use crate::partitioners::ShufflePartitioner; -use crate::writers::PartitionWriter; -use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; -use arrow::array::{ArrayRef, RecordBatch, UInt32Array}; -use arrow::compute::take; -use arrow::datatypes::SchemaRef; -use datafusion::common::DataFusionError; -use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; -use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::physical_plan::metrics::Time; -use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; -use std::fmt; -use std::fmt::{Debug, Formatter}; -use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Seek, Write}; -use std::sync::Arc; -use tokio::time::Instant; - -/// A shuffle repartitioner that sorts each batch by partition ID using counting sort, -/// then slices and writes per-partition sub-batches immediately. This avoids -/// per-partition Arrow builders, so memory usage is O(batch_size) regardless of -/// partition count. -pub(crate) struct SortBasedPartitioner { - output_data_file: String, - output_index_file: String, - schema: SchemaRef, - /// One writer per output partition (manages spill files) - partition_writers: Vec, - shuffle_block_writer: ShuffleBlockWriter, - partitioning: CometPartitioning, - runtime: Arc, - metrics: ShufflePartitionerMetrics, - /// The configured batch size - batch_size: usize, - /// Memory reservation for tracking - reservation: MemoryReservation, - /// Size of the write buffer in bytes - write_buffer_size: usize, - /// Reusable scratch buffers - hashes_buf: Vec, - partition_ids: Vec, - sorted_indices: Vec, - /// Partition boundary offsets: partition_starts[i]..partition_starts[i+1] - /// gives the range in sorted_indices for partition i - partition_starts: Vec, -} -``` - -- [ ] **Step 2: Implement constructor** - -Add to the same file: - -```rust -impl SortBasedPartitioner { - #[allow(clippy::too_many_arguments)] - pub(crate) fn try_new( - partition: usize, - output_data_file: String, - output_index_file: String, - schema: SchemaRef, - partitioning: CometPartitioning, - metrics: ShufflePartitionerMetrics, - runtime: Arc, - batch_size: usize, - codec: CompressionCodec, - write_buffer_size: usize, - ) -> datafusion::common::Result { - let num_output_partitions = partitioning.partition_count(); - - let shuffle_block_writer = - ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; - - let partition_writers = (0..num_output_partitions) - .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) - .collect::>>()?; - - let reservation = - MemoryConsumer::new(format!("SortBasedPartitioner[{partition}]")) - .register(&runtime.memory_pool); - - let hashes_buf = match partitioning { - CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => { - vec![0u32; batch_size] - } - _ => vec![], - }; - - Ok(Self { - output_data_file, - output_index_file, - schema, - partition_writers, - shuffle_block_writer, - partitioning, - runtime, - metrics, - batch_size, - reservation, - write_buffer_size, - hashes_buf, - partition_ids: vec![0u32; batch_size], - sorted_indices: vec![0u32; batch_size], - partition_starts: vec![0usize; num_output_partitions + 1], - }) - } -} -``` - -- [ ] **Step 3: Register module in mod.rs** - -Modify `native/shuffle/src/partitioners/mod.rs` — add after `mod single_partition;`: - -```rust -mod sort_based; -``` - -And add to the pub use section: - -```rust -pub(crate) use sort_based::SortBasedPartitioner; -``` - -- [ ] **Step 4: Verify it compiles** - -Run: `cargo build --manifest-path native/Cargo.toml 2>&1 | tail -5` -Expected: Compiles (with dead code warnings, which is fine) - -- [ ] **Step 5: Commit** - -```bash -git add native/shuffle/src/partitioners/sort_based.rs native/shuffle/src/partitioners/mod.rs -git commit -m "feat: add SortBasedPartitioner struct and constructor" -``` - ---- - -### Task 2: Implement partition ID computation and counting sort - -**Files:** -- Modify: `native/shuffle/src/partitioners/sort_based.rs` - -- [ ] **Step 1: Add the counting sort method** - -This method computes partition IDs for each row, then uses counting sort to produce sorted indices and partition boundary offsets. Add to the `impl SortBasedPartitioner` block: - -```rust - /// Compute partition IDs for each row and counting-sort the indices by partition. - /// After this call: - /// - `self.sorted_indices[self.partition_starts[p]..self.partition_starts[p+1]]` - /// contains the row indices belonging to partition `p`. - fn compute_partition_ids_and_sort( - &mut self, - input: &RecordBatch, - ) -> datafusion::common::Result<()> { - let num_rows = input.num_rows(); - let num_partitions = self.partitioning.partition_count(); - - match &self.partitioning { - CometPartitioning::Hash(exprs, num_output_partitions) => { - let arrays = exprs - .iter() - .map(|expr| expr.evaluate(input)?.into_array(num_rows)) - .collect::>>()?; - - let hashes_buf = &mut self.hashes_buf[..num_rows]; - hashes_buf.fill(42_u32); - create_murmur3_hashes(&arrays, hashes_buf)?; - - let partition_ids = &mut self.partition_ids[..num_rows]; - for (idx, hash) in hashes_buf.iter().enumerate() { - partition_ids[idx] = - comet_partitioning::pmod(*hash, *num_output_partitions) as u32; - } - } - CometPartitioning::RangePartitioning( - lex_ordering, - _num_output_partitions, - row_converter, - bounds, - ) => { - let arrays = lex_ordering - .iter() - .map(|expr| expr.expr.evaluate(input)?.into_array(num_rows)) - .collect::>>()?; - - let row_batch = row_converter.convert_columns(arrays.as_slice())?; - let partition_ids = &mut self.partition_ids[..num_rows]; - row_batch.iter().enumerate().for_each(|(row_idx, row)| { - partition_ids[row_idx] = bounds - .as_slice() - .partition_point(|bound| bound.row() <= row) - as u32; - }); - } - CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { - let num_columns_to_hash = if *max_hash_columns == 0 { - input.num_columns() - } else { - (*max_hash_columns).min(input.num_columns()) - }; - let columns_to_hash: Vec = (0..num_columns_to_hash) - .map(|i| Arc::clone(input.column(i))) - .collect(); - - let hashes_buf = &mut self.hashes_buf[..num_rows]; - hashes_buf.fill(42_u32); - create_murmur3_hashes(&columns_to_hash, hashes_buf)?; - - let partition_ids = &mut self.partition_ids[..num_rows]; - for (idx, hash) in hashes_buf.iter().enumerate() { - partition_ids[idx] = - comet_partitioning::pmod(*hash, *num_output_partitions) as u32; - } - } - other => { - return Err(DataFusionError::NotImplemented(format!( - "Unsupported shuffle partitioning scheme {other:?}" - ))); - } - } - - // Counting sort: count rows per partition - let partition_starts = &mut self.partition_starts[..num_partitions + 1]; - partition_starts.fill(0); - let partition_ids = &self.partition_ids[..num_rows]; - for &pid in partition_ids.iter() { - partition_starts[pid as usize + 1] += 1; - } - - // Prefix sum to get start offsets - for i in 1..=num_partitions { - partition_starts[i] += partition_starts[i - 1]; - } - - // Place indices into sorted order - let sorted_indices = &mut self.sorted_indices[..num_rows]; - // We need a mutable copy of the starts to use as cursors - // Use partition_starts as-is, advancing each partition's cursor - let mut cursors = partition_starts.to_vec(); - for (row_idx, &pid) in partition_ids.iter().enumerate() { - let pos = cursors[pid as usize]; - sorted_indices[pos] = row_idx as u32; - cursors[pid as usize] += 1; - } - - Ok(()) - } -``` - -- [ ] **Step 2: Verify it compiles** - -Run: `cargo build --manifest-path native/Cargo.toml 2>&1 | tail -5` -Expected: Compiles - -- [ ] **Step 3: Commit** - -```bash -git add native/shuffle/src/partitioners/sort_based.rs -git commit -m "feat: add partition ID computation and counting sort" -``` - ---- - -### Task 3: Implement `insert_batch` — sort, slice, and write per-partition - -**Files:** -- Modify: `native/shuffle/src/partitioners/sort_based.rs` - -- [ ] **Step 1: Add the process_batch method** - -This is the core method that sorts a single batch by partition and writes each partition's slice. Add to the `impl SortBasedPartitioner` block: - -```rust - /// Process a single batch: compute partition IDs, sort, slice by partition, - /// and write each slice to the corresponding partition writer. - fn process_batch(&mut self, input: RecordBatch) -> datafusion::common::Result<()> { - if input.num_rows() == 0 { - return Ok(()); - } - - let num_rows = input.num_rows(); - let num_partitions = self.partitioning.partition_count(); - - // Update metrics - self.metrics.data_size.add(input.get_array_memory_size()); - self.metrics.baseline.record_output(num_rows); - - // Phase 1: Compute partition IDs and counting-sort indices - { - let mut timer = self.metrics.repart_time.timer(); - self.compute_partition_ids_and_sort(&input)?; - timer.stop(); - } - - // Phase 2: Reorder the batch using take, then slice by partition boundaries - let sorted_indices = &self.sorted_indices[..num_rows]; - let partition_starts = &self.partition_starts[..num_partitions + 1]; - - let indices_array = UInt32Array::from_iter_values(sorted_indices.iter().copied()); - let sorted_batch = RecordBatch::try_new( - input.schema(), - input - .columns() - .iter() - .map(|col| take(col, &indices_array, None)) - .collect::, _>>()?, - )?; - - // Phase 3: Slice the sorted batch at partition boundaries and write each slice - for partition_id in 0..num_partitions { - let start = partition_starts[partition_id]; - let end = partition_starts[partition_id + 1]; - let len = end - start; - if len == 0 { - continue; - } - - let partition_batch = sorted_batch.slice(start, len); - self.partition_writers[partition_id].spill_batch( - &partition_batch, - &self.runtime, - &self.metrics, - self.write_buffer_size, - self.batch_size, - )?; - } - - Ok(()) - } -``` - -- [ ] **Step 2: Add `spill_batch` method to `PartitionWriter`** - -Modify `native/shuffle/src/writers/spill.rs` to add a method that writes a single batch (not an iterator). Add after the existing `spill` method: - -```rust - /// Write a single batch to this partition's spill file. - pub(crate) fn spill_batch( - &mut self, - batch: &RecordBatch, - runtime: &RuntimeEnv, - metrics: &ShufflePartitionerMetrics, - write_buffer_size: usize, - batch_size: usize, - ) -> datafusion::common::Result { - if batch.num_rows() == 0 { - return Ok(0); - } - self.ensure_spill_file_created(runtime)?; - - let mut buf_batch_writer = BufBatchWriter::new( - &mut self.shuffle_block_writer, - &mut self.spill_file.as_mut().unwrap().file, - write_buffer_size, - batch_size, - ); - let bytes_written = - buf_batch_writer.write(batch, &metrics.encode_time, &metrics.write_time)?; - buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; - Ok(bytes_written) - } -``` - -- [ ] **Step 3: Implement the `ShufflePartitioner` trait** - -Add to `sort_based.rs`: - -```rust -#[async_trait::async_trait] -impl ShufflePartitioner for SortBasedPartitioner { - async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { - let start_time = Instant::now(); - let mut start = 0; - while start < batch.num_rows() { - let end = (start + self.batch_size).min(batch.num_rows()); - let slice = batch.slice(start, end - start); - self.process_batch(slice)?; - start = end; - } - self.metrics.input_batches.add(1); - self.metrics - .baseline - .elapsed_compute() - .add_duration(start_time.elapsed()); - Ok(()) - } - - fn shuffle_write(&mut self) -> datafusion::common::Result<()> { - let start_time = Instant::now(); - let num_output_partitions = self.partition_writers.len(); - let mut offsets = vec![0i64; num_output_partitions + 1]; - - let data_file = self.output_data_file.clone(); - let index_file = self.output_index_file.clone(); - - let output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - - let mut output_data = BufWriter::new(output_data); - - for i in 0..num_output_partitions { - offsets[i] = output_data.stream_position()? as i64; - - // Copy spill file contents to final data file - if let Some(spill_path) = self.partition_writers[i].path() { - let mut spill_file = File::open(spill_path)?; - let mut write_timer = self.metrics.write_time.timer(); - std::io::copy(&mut spill_file, &mut output_data)?; - write_timer.stop(); - } - } - - let mut write_timer = self.metrics.write_time.timer(); - output_data.flush()?; - write_timer.stop(); - - offsets[num_output_partitions] = output_data.stream_position()? as i64; - - // Write index file - let mut write_timer = self.metrics.write_time.timer(); - let mut output_index = - BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {e:?}")) - })?); - for offset in offsets { - output_index.write_all(&offset.to_le_bytes()[..])?; - } - output_index.flush()?; - write_timer.stop(); - - self.metrics - .baseline - .elapsed_compute() - .add_duration(start_time.elapsed()); - - Ok(()) - } -} - -impl Debug for SortBasedPartitioner { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("SortBasedPartitioner") - .field("partitions", &self.partition_writers.len()) - .finish() - } -} -``` - -- [ ] **Step 4: Verify it compiles** - -Run: `cargo build --manifest-path native/Cargo.toml 2>&1 | tail -5` -Expected: Compiles - -- [ ] **Step 5: Commit** - -```bash -git add native/shuffle/src/partitioners/sort_based.rs native/shuffle/src/writers/spill.rs -git commit -m "feat: implement insert_batch and shuffle_write for SortBasedPartitioner" -``` - ---- - -### Task 4: Wire up `SortBasedPartitioner` in the shuffle writer - -**Files:** -- Modify: `native/shuffle/src/shuffle_writer.rs` - -- [ ] **Step 1: Add sort-based mode to the shuffle_write function** - -In `native/shuffle/src/shuffle_writer.rs`, the `shuffle_write` function (line ~196) currently constructs a `MultiPartitionShuffleRepartitioner` as the default for multi-partition cases. We need to add a `sort_based` boolean parameter and use `SortBasedPartitioner` when true. - -First, update the `ShuffleWriterExec` struct to include the new flag. Add field after `write_buffer_size`: - -```rust - /// Whether to use sort-based repartitioning - sort_based: bool, -``` - -Update `try_new` to accept the new parameter (add `sort_based: bool` as the last parameter) and store it in the struct. - -Update `with_children` to pass `self.sort_based` through. - -Then modify the `shuffle_write` function signature to add `sort_based: bool` parameter, and update the match arm for multi-partition: - -```rust - _ if sort_based => Box::new(SortBasedPartitioner::try_new( - partition, - output_data_file, - output_index_file, - Arc::clone(&schema), - partitioning, - metrics, - context.runtime_env(), - context.session_config().batch_size(), - codec, - write_buffer_size, - )?), - _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( -``` - -Add the import at the top of the file: - -```rust -use crate::partitioners::SortBasedPartitioner; -``` - -- [ ] **Step 2: Update existing tests to pass `sort_based: false`** - -All existing `ShuffleWriterExec::try_new` calls in tests need the new `false` parameter added. - -- [ ] **Step 3: Verify it compiles and existing tests pass** - -Run: `cargo test --manifest-path native/Cargo.toml -p datafusion-comet-shuffle 2>&1 | tail -20` -Expected: All existing tests pass - -- [ ] **Step 4: Commit** - -```bash -git add native/shuffle/src/shuffle_writer.rs -git commit -m "feat: wire SortBasedPartitioner into shuffle writer" -``` - ---- - -### Task 5: Add tests for sort-based partitioner - -**Files:** -- Modify: `native/shuffle/src/shuffle_writer.rs` (test module) - -- [ ] **Step 1: Create a parameterized test helper** - -Add a new test helper that runs the existing `shuffle_write_test` logic but with `sort_based: true`. The simplest approach is to parameterize the existing `shuffle_write_test` function: - -Rename the existing `shuffle_write_test` to accept a `sort_based` parameter: - -```rust - fn shuffle_write_test( - batch_size: usize, - num_batches: usize, - num_partitions: usize, - memory_limit: Option, - sort_based: bool, - ) { -``` - -And pass it through to `ShuffleWriterExec::try_new`. - -Update all existing callers to pass `false`. - -- [ ] **Step 2: Add sort-based test cases** - -Add new tests that mirror the existing ones but with `sort_based: true`: - -```rust - #[test] - #[cfg_attr(miri, ignore)] - fn test_sort_based_basic() { - shuffle_write_test(1000, 100, 1, None, true); - shuffle_write_test(10000, 10, 1, None, true); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn test_sort_based_insert_larger_batch() { - shuffle_write_test(10000, 1, 16, true); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn test_sort_based_insert_smaller_batch() { - shuffle_write_test(1000, 1, 16, None, true); - shuffle_write_test(1000, 10, 16, None, true); - } - - #[test] - #[cfg_attr(miri, ignore)] - fn test_sort_based_large_number_of_partitions() { - shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024), true); - shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024), true); - } -``` - -- [ ] **Step 3: Run the tests** - -Run: `cargo test --manifest-path native/Cargo.toml -p datafusion-comet-shuffle 2>&1 | tail -20` -Expected: All tests pass (both existing buffered and new sort-based) - -- [ ] **Step 4: Commit** - -```bash -git add native/shuffle/src/shuffle_writer.rs -git commit -m "test: add sort-based partitioner tests" -``` - ---- - -### Task 6: Add sort-based mode to benchmark binary - -**Files:** -- Modify: `native/shuffle/src/bin/shuffle_bench.rs` - -- [ ] **Step 1: Update benchmark to support sort-based mode** - -The existing benchmark already has a `--mode` CLI arg. Add `"sort"` as a new mode option. Find where mode is parsed and `ShuffleWriterExec` is created, and pass `sort_based: true` when mode is `"sort"`. - -Search for where `ShuffleWriterExec::try_new` is called in the benchmark file and add the `sort_based` parameter: - -```rust -let sort_based = args.mode == "sort"; -``` - -Pass `sort_based` to `ShuffleWriterExec::try_new`. - -- [ ] **Step 2: Verify it compiles** - -Run: `cargo build --release --features shuffle-bench --bin shuffle_bench --manifest-path native/Cargo.toml 2>&1 | tail -5` -Expected: Compiles - -- [ ] **Step 3: Commit** - -```bash -git add native/shuffle/src/bin/shuffle_bench.rs -git commit -m "feat: add sort mode to shuffle benchmark" -``` - ---- - -### Task 7: Wire up JVM side — add config and pass through to native - -**Files:** -- Modify: `common/src/main/java/org/apache/comet/CometConf.scala` (add config) -- Modify: `spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala` (pass config) -- Modify: `native/core/src/execution/planner.rs` (receive and forward) - -- [ ] **Step 1: Add config option in CometConf.scala** - -Find the existing shuffle-related configs (search for `SHUFFLE` or `shuffle`). Add a new config: - -```scala - val COMET_SHUFFLE_SORT_BASED: ConfigEntry[Boolean] = - conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.sort_based") - .doc( - "When enabled, uses sort-based repartitioning for native shuffle. " + - "This avoids per-partition memory overhead from builders, making it more " + - "memory-efficient for large partition counts. Default is false (uses buffered mode).") - .booleanConf - .createWithDefault(false) -``` - -- [ ] **Step 2: Pass config through CometNativeShuffleWriter** - -In `CometNativeShuffleWriter.scala`, find where `ShuffleWriterExec` parameters are passed to native via protobuf or JNI. Add `sort_based` to the serialized plan. - -- [ ] **Step 3: Receive in planner.rs** - -In `native/core/src/execution/planner.rs`, find where `ShuffleWriterExec` is constructed. Extract the `sort_based` field from the protobuf and pass it to `ShuffleWriterExec::try_new`. - -- [ ] **Step 4: Update protobuf if needed** - -If the shuffle writer parameters are passed via protobuf (`native/proto/src/`), add a `sort_based` field to the relevant message. - -- [ ] **Step 5: Build and verify** - -Run: `make core` -Expected: Compiles - -- [ ] **Step 6: Commit** - -```bash -git add common/src/main/java/org/apache/comet/CometConf.scala \ - spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala \ - native/core/src/execution/planner.rs -git commit -m "feat: add sort_based shuffle config and JVM-native plumbing" -``` - ---- - -### Task 8: Run clippy and format - -**Files:** -- Potentially any Rust files modified above - -- [ ] **Step 1: Run cargo fmt** - -Run: `cd native && cargo fmt` - -- [ ] **Step 2: Run clippy** - -Run: `cd native && cargo clippy --all-targets --workspace -- -D warnings 2>&1 | tail -20` -Expected: No warnings - -- [ ] **Step 3: Fix any issues and commit** - -```bash -git add -A -git commit -m "chore: fix formatting and clippy warnings" -``` - ---- - -### Task 9: Run full test suite and benchmarks - -**Files:** None (verification only) - -- [ ] **Step 1: Run Rust tests** - -Run: `cargo test --manifest-path native/Cargo.toml -p datafusion-comet-shuffle` -Expected: All tests pass - -- [ ] **Step 2: Run benchmark comparison** - -Run the shuffle benchmark with all three modes and compare: - -```bash -./native/target/release/shuffle_bench \ - --input /opt/tpch/sf100/lineitem \ - --limit 100000000 \ - --partitions 200 \ - --codec lz4 \ - --hash-columns 0,3 \ - --memory-limit 4294000000 \ - --mode sort - -./native/target/release/shuffle_bench \ - --input /opt/tpch/sf100/lineitem \ - --limit 100000000 \ - --partitions 800 \ - --codec lz4 \ - --hash-columns 0,3 \ - --memory-limit 4294000000 \ - --mode sort -``` - -Compare throughput and peak memory against `buffered` mode. - -- [ ] **Step 3: Run JVM tests** - -Run: `make && ./mvnw test -DwildcardSuites="CometNativeShuffle"` -Expected: All shuffle tests pass