feat: add standalone shuffle benchmark tool#3752
Conversation
Add a `shuffle_bench` binary that benchmarks shuffle write and read performance independently from Spark, making it easy to profile with tools like `cargo flamegraph`, `perf`, or `instruments`. Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating synthetic data with configurable schema. Covers different scenarios including compression codecs, partition counts, partitioning schemes, and memory-constrained spilling.
…arquet - Add `spark.comet.exec.shuffle.maxBufferedBatches` config to limit the number of batches buffered before spilling, allowing earlier spilling to reduce peak memory usage on executors - Fix too-many-open-files: close spill file FD after each spill and reopen in append mode, rather than holding one FD open per partition - Refactor shuffle_bench to stream directly from Parquet instead of loading all input data into memory; remove synthetic data generation - Add --max-buffered-batches CLI arg to shuffle_bench - Add shuffle benchmark documentation to README
Merge latest from apache/main, resolve conflicts, and strip out COMET_SHUFFLE_MAX_BUFFERED_BATCHES config and all related plumbing. This branch now only adds the shuffle benchmark binary.
Spawns N parallel shuffle tasks to simulate executor parallelism. Each task reads the same input and writes to its own output files. Extracts core shuffle logic into shared async helper to avoid code duplication between single and concurrent paths.
cf56e39 to
c469077
Compare
mbutrovich
left a comment
There was a problem hiding this comment.
Minor comments so far. Thanks for working on this @andygrove!
| | `--zstd-level` | `1` | Zstd compression level (1–22) | | ||
| | `--batch-size` | `8192` | Batch size for reading Parquet data | | ||
| | `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded | | ||
| | `--max-buffered-batches` | `0` | Max batches to buffer before spilling (0 = memory-pool-only) | |
There was a problem hiding this comment.
Is this used or an old argument? I don't see it in Args anywhere.
| ### Basic usage | ||
|
|
||
| ```sh | ||
| cargo run --release --features shuffle-bench --bin shuffle_bench -- \ |
There was a problem hiding this comment.
I don't think we need to hide this behind a features flag. If the intention is that a dev need not build it every time, then I guess that is ok. Might be useful to add a target in the Makefile.
Or, perhaps, we can have a tools directory (maybe under dev) where we can add standalone tools and a target in the Makefile to build the tools directory.
There was a problem hiding this comment.
The benchmark brings in additional dependencies (clap and parquet) so increases build time. That was the motivation for using a feature flag.
There was a problem hiding this comment.
I like the feature flag, FWIW.
| //! # Usage | ||
| //! | ||
| //! ```sh | ||
| //! cargo run --release --bin shuffle_bench -- \ |
There was a problem hiding this comment.
Do we need to specify the features flag here?
| }) | ||
| .collect(); | ||
|
|
||
| let data_bytes = fs::read(data_file).expect("Failed to read data file"); |
There was a problem hiding this comment.
The data_file could be large. Would reading the entire file emulate the behavior when we read the file in production? (For large files, we probably would/should have a buffered read?)
There was a problem hiding this comment.
I reverted the shuffle read part of the benchmark so that this PR is just for profiling shuffle write (which is the complex part). We can add a shuffle read benchmark in the future as and when we need it.
Revert new shuffle metrics (interleave_time, coalesce_time, memcopy_time) to keep PR focused on the benchmark tool. Remove read-back functionality from shuffle_bench to focus on write performance. Remove undocumented --max-buffered-batches option from README.
parthchandra
left a comment
There was a problem hiding this comment.
This looks good to me. Pending the comment to be addressed and ci. (I tried re-running the failed pipeline but it failed to start).
Thanks for the approval @parthchandra. I'm not clear on which comment still needs to be addressed. @mbutrovich Could you take another look? You requested changes in your last review. |
mbutrovich
left a comment
There was a problem hiding this comment.
Thanks for putting this together, @andygrove!
There was a comment from Matt that was still pending at that time. All good now. |
Which issue does this PR close?
N/A
Rationale for this change
Profiling and optimizing Comet's native shuffle writer requires running it in isolation outside of Spark. This PR adds a standalone benchmark binary that streams data from Parquet files through the shuffle writer, and adds finer-grained metrics to identify bottlenecks.
What changes are included in this PR?
Standalone shuffle benchmark (
shuffle_bench)A new binary in the shuffle crate for benchmarking shuffle write and read performance outside of Spark. Streams input directly from Parquet files.
Key features:
--concurrent-tasks)Finer-grained shuffle metrics
Added three new timing metrics to
ShufflePartitionerMetricsand threaded them through the existing shuffle writers:interleave_time: Time spent ininterleave_record_batchgathering rows into shuffled batchescoalesce_time: Time spent coalescing small batches before serializationmemcopy_time: Time spent buffering partition indices and memory accountingThese metrics are reported by the benchmark tool to help identify which phase of shuffle writing is the bottleneck.
How are these changes tested?
Manual benchmarking with TPC-H SF100 data. The benchmark binary is gated behind the
shuffle-benchcargo feature flag and does not affect production builds. Existing shuffle tests continue to pass.