diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 7544f7ae26d4..93b7b1809d52 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -99,12 +99,14 @@ use tokio::runtime::Runtime; /// models 8 partition plan (should it be 16??) const NUM_STREAMS: usize = 8; -/// The size of each batch within each stream -const BATCH_SIZE: usize = 1024; - -/// Input sizes to benchmark. The small size (100K) exercises the -/// in-memory concat-and-sort path; the large size (10M) exercises -/// the sort-then-merge path with high fan-in. +/// The size of each batch within each stream. 8192 rows matches DataFusion's +/// default target batch size and better models the batch size seen by SortExec +/// in normal query execution. +const BATCH_SIZE: usize = 8192; + +/// Input sizes to benchmark. The small size (100K) exercises smaller in-memory +/// sorts; the large size (1M) exercises the sort-then-merge path with higher +/// fan-in. const INPUT_SIZES: &[(u64, &str)] = &[(100_000, "100k"), (1_000_000, "1M")]; type PartitionedBatches = Vec>; @@ -200,6 +202,47 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(move || case.run()) }); } + + let key_only_cases: Vec<(&str, StreamGenerator)> = vec![ + ( + "i64 key utf8 payload", + Box::new(move |sorted| i64_key_utf8_payload_streams(sorted, input_size)), + ), + ( + "i64 key dictionary payload", + Box::new(move |sorted| { + i64_key_dictionary_payload_streams(sorted, input_size) + }), + ), + ]; + + for (name, f) in &key_only_cases { + let sort_columns = &["key"]; + + c.bench_function(&format!("merge sorted {name} {size_label}"), |b| { + let data = f(true); + let case = BenchCase::merge_sorted_by_columns(&data, sort_columns); + b.iter(move || case.run()) + }); + + c.bench_function(&format!("sort merge {name} {size_label}"), |b| { + let data = f(false); + let case = BenchCase::sort_merge_by_columns(&data, sort_columns); + b.iter(move || case.run()) + }); + + c.bench_function(&format!("sort {name} {size_label}"), |b| { + let data = f(false); + let case = BenchCase::sort_by_columns(&data, sort_columns); + b.iter(move || case.run()) + }); + + c.bench_function(&format!("sort partitioned {name} {size_label}"), |b| { + let data = f(false); + let case = BenchCase::sort_partitioned_by_columns(&data, sort_columns); + b.iter(move || case.run()) + }); + } } } @@ -295,6 +338,90 @@ impl BenchCase { } } + fn merge_sorted_by_columns( + partitions: &[Vec], + sort_columns: &[&str], + ) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let schema = partitions[0][0].schema(); + let sort = make_sort_exprs_for_columns(schema.as_ref(), sort_columns); + + let exec = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap(); + let plan = Arc::new(SortPreservingMergeExec::new(sort, exec)); + + Self { + runtime, + task_ctx, + plan, + } + } + + fn sort_merge_by_columns( + partitions: &[Vec], + sort_columns: &[&str], + ) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let schema = partitions[0][0].schema(); + let sort = make_sort_exprs_for_columns(schema.as_ref(), sort_columns); + + let source = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap(); + let exec = SortExec::new(sort.clone(), source).with_preserve_partitioning(true); + let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); + + Self { + runtime, + task_ctx, + plan, + } + } + + fn sort_by_columns(partitions: &[Vec], sort_columns: &[&str]) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let schema = partitions[0][0].schema(); + let sort = make_sort_exprs_for_columns(schema.as_ref(), sort_columns); + + let exec = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap(); + let exec = Arc::new(CoalescePartitionsExec::new(exec)); + let plan = Arc::new(SortExec::new(sort, exec)); + + Self { + runtime, + task_ctx, + plan, + } + } + + fn sort_partitioned_by_columns( + partitions: &[Vec], + sort_columns: &[&str], + ) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let schema = partitions[0][0].schema(); + let sort = make_sort_exprs_for_columns(schema.as_ref(), sort_columns); + + let source = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap(); + let exec = SortExec::new(sort, source).with_preserve_partitioning(true); + let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); + + Self { + runtime, + task_ctx, + plan, + } + } + /// runs the specified plan to completion, draining all input and /// panic'ing on error fn run(&self) { @@ -321,6 +448,14 @@ fn make_sort_exprs(schema: &Schema) -> LexOrdering { LexOrdering::new(sort_exprs).unwrap() } +/// Make sort exprs for the specified columns in `schema` +fn make_sort_exprs_for_columns(schema: &Schema, sort_columns: &[&str]) -> LexOrdering { + let sort_exprs = sort_columns + .iter() + .map(|name| PhysicalSortExpr::new_default(col(name, schema).unwrap())); + LexOrdering::new(sort_exprs).unwrap() +} + /// Create streams of int64 (where approximately 1/3 values is repeated) fn i64_streams(sorted: bool, input_size: u64) -> PartitionedBatches { let mut values = DataGenerator::new(input_size).i64_values(); @@ -628,6 +763,78 @@ fn mixed_dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedB }) } +/// Create a batch of (key, utf8_low_payload, utf8_high_payload) sorted only by key. +fn i64_key_utf8_payload_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); + let keys = data_gen.i64_values_for_sort_input(sorted); + let mut tuples: Vec<_> = keys + .into_iter() + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_high_cardinality_values()) + .collect(); + + if sorted { + tuples.sort_unstable_by_key(|((key, _), _)| *key); + } + + split_tuples(tuples, |tuples| { + let (tuples, payload_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (keys, payload_low): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let key: Int64Array = keys.into_iter().collect(); + let payload_low: StringArray = payload_low.into_iter().collect(); + let payload_high: StringArray = payload_high.into_iter().collect(); + + RecordBatch::try_from_iter(vec![ + ("key", Arc::new(key) as _), + ("payload_low", Arc::new(payload_low) as _), + ("payload_high", Arc::new(payload_high) as _), + ]) + .unwrap() + }) +} + +/// Create a batch of (key, dictionary payloads) sorted only by key. +fn i64_key_dictionary_payload_streams( + sorted: bool, + input_size: u64, +) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); + let keys = data_gen.i64_values_for_sort_input(sorted); + let mut tuples: Vec<_> = keys + .into_iter() + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .collect(); + + if sorted { + tuples.sort_unstable_by_key(|(((key, _), _), _)| *key); + } + + split_tuples(tuples, |tuples| { + let (tuples, payload_c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (tuples, payload_b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (keys, payload_a): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let key: Int64Array = keys.into_iter().collect(); + let payload_a: DictionaryArray = + payload_a.iter().map(Option::as_deref).collect(); + let payload_b: DictionaryArray = + payload_b.iter().map(Option::as_deref).collect(); + let payload_c: DictionaryArray = + payload_c.iter().map(Option::as_deref).collect(); + + RecordBatch::try_from_iter(vec![ + ("key", Arc::new(key) as _), + ("payload_a", Arc::new(payload_a) as _), + ("payload_b", Arc::new(payload_b) as _), + ("payload_c", Arc::new(payload_c) as _), + ]) + .unwrap() + }) +} + /// Encapsulates creating data for this test struct DataGenerator { rng: StdRng, @@ -644,10 +851,7 @@ impl DataGenerator { /// Create an array of i64 sorted values (where approximately 1/3 values is repeated) fn i64_values(&mut self) -> Vec { - let mut vec: Vec<_> = (0..self.input_size) - .map(|_| self.rng.random_range(0..self.input_size as i64)) - .collect(); - + let mut vec = self.i64_unsorted_values(); vec.sort_unstable(); // 6287 distinct / 10000 total @@ -656,6 +860,20 @@ impl DataGenerator { vec } + fn i64_values_for_sort_input(&mut self, sorted: bool) -> Vec { + if sorted { + self.i64_values() + } else { + self.i64_unsorted_values() + } + } + + fn i64_unsorted_values(&mut self) -> Vec { + (0..self.input_size) + .map(|_| self.rng.random_range(0..self.input_size as i64)) + .collect() + } + /// Create an array of f64 sorted values (with same distribution of `i64_values`) fn f64_values(&mut self) -> Vec { self.i64_values().into_iter().map(|v| v as f64).collect()