Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 228 additions & 10 deletions datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@ use tokio::runtime::Runtime;
/// models 8 partition plan (should it be 16??)
const NUM_STREAMS: usize = 8;

/// The size of each batch within each stream
const BATCH_SIZE: usize = 1024;

/// Input sizes to benchmark. The small size (100K) exercises the
/// in-memory concat-and-sort path; the large size (10M) exercises
/// the sort-then-merge path with high fan-in.
/// The size of each batch within each stream. 8192 rows matches DataFusion's
/// default target batch size and better models the batch size seen by SortExec
/// in normal query execution.
const BATCH_SIZE: usize = 8192;

/// Input sizes to benchmark. The small size (100K) exercises smaller in-memory
/// sorts; the large size (1M) exercises the sort-then-merge path with higher
/// fan-in.
const INPUT_SIZES: &[(u64, &str)] = &[(100_000, "100k"), (1_000_000, "1M")];

type PartitionedBatches = Vec<Vec<RecordBatch>>;
Expand Down Expand Up @@ -200,6 +202,47 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(move || case.run())
});
}

let key_only_cases: Vec<(&str, StreamGenerator)> = vec![
(
"i64 key utf8 payload",
Box::new(move |sorted| i64_key_utf8_payload_streams(sorted, input_size)),
),
(
"i64 key dictionary payload",
Box::new(move |sorted| {
i64_key_dictionary_payload_streams(sorted, input_size)
}),
),
];

for (name, f) in &key_only_cases {
let sort_columns = &["key"];

c.bench_function(&format!("merge sorted {name} {size_label}"), |b| {
let data = f(true);
let case = BenchCase::merge_sorted_by_columns(&data, sort_columns);
b.iter(move || case.run())
});

c.bench_function(&format!("sort merge {name} {size_label}"), |b| {
let data = f(false);
let case = BenchCase::sort_merge_by_columns(&data, sort_columns);
b.iter(move || case.run())
});

c.bench_function(&format!("sort {name} {size_label}"), |b| {
let data = f(false);
let case = BenchCase::sort_by_columns(&data, sort_columns);
b.iter(move || case.run())
});

c.bench_function(&format!("sort partitioned {name} {size_label}"), |b| {
let data = f(false);
let case = BenchCase::sort_partitioned_by_columns(&data, sort_columns);
b.iter(move || case.run())
});
}
}
}

Expand Down Expand Up @@ -295,6 +338,90 @@ impl BenchCase {
}
}

fn merge_sorted_by_columns(
partitions: &[Vec<RecordBatch>],
sort_columns: &[&str],
) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

let schema = partitions[0][0].schema();
let sort = make_sort_exprs_for_columns(schema.as_ref(), sort_columns);

let exec = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap();
let plan = Arc::new(SortPreservingMergeExec::new(sort, exec));

Self {
runtime,
task_ctx,
plan,
}
}

fn sort_merge_by_columns(
partitions: &[Vec<RecordBatch>],
sort_columns: &[&str],
) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

let schema = partitions[0][0].schema();
let sort = make_sort_exprs_for_columns(schema.as_ref(), sort_columns);

let source = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap();
let exec = SortExec::new(sort.clone(), source).with_preserve_partitioning(true);
let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));

Self {
runtime,
task_ctx,
plan,
}
}

fn sort_by_columns(partitions: &[Vec<RecordBatch>], sort_columns: &[&str]) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

let schema = partitions[0][0].schema();
let sort = make_sort_exprs_for_columns(schema.as_ref(), sort_columns);

let exec = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap();
let exec = Arc::new(CoalescePartitionsExec::new(exec));
let plan = Arc::new(SortExec::new(sort, exec));

Self {
runtime,
task_ctx,
plan,
}
}

fn sort_partitioned_by_columns(
partitions: &[Vec<RecordBatch>],
sort_columns: &[&str],
) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

let schema = partitions[0][0].schema();
let sort = make_sort_exprs_for_columns(schema.as_ref(), sort_columns);

let source = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap();
let exec = SortExec::new(sort, source).with_preserve_partitioning(true);
let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));

Self {
runtime,
task_ctx,
plan,
}
}

/// runs the specified plan to completion, draining all input and
/// panic'ing on error
fn run(&self) {
Expand All @@ -321,6 +448,14 @@ fn make_sort_exprs(schema: &Schema) -> LexOrdering {
LexOrdering::new(sort_exprs).unwrap()
}

/// Make sort exprs for the specified columns in `schema`
fn make_sort_exprs_for_columns(schema: &Schema, sort_columns: &[&str]) -> LexOrdering {
let sort_exprs = sort_columns
.iter()
.map(|name| PhysicalSortExpr::new_default(col(name, schema).unwrap()));
LexOrdering::new(sort_exprs).unwrap()
}

/// Create streams of int64 (where approximately 1/3 values is repeated)
fn i64_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut values = DataGenerator::new(input_size).i64_values();
Expand Down Expand Up @@ -628,6 +763,78 @@ fn mixed_dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedB
})
}

/// Create a batch of (key, utf8_low_payload, utf8_high_payload) sorted only by key.
fn i64_key_utf8_payload_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
let mut data_gen = DataGenerator::new(input_size);
let keys = data_gen.i64_values_for_sort_input(sorted);
let mut tuples: Vec<_> = keys
.into_iter()
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_high_cardinality_values())
.collect();

if sorted {
tuples.sort_unstable_by_key(|((key, _), _)| *key);
}

split_tuples(tuples, |tuples| {
let (tuples, payload_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (keys, payload_low): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();

let key: Int64Array = keys.into_iter().collect();
let payload_low: StringArray = payload_low.into_iter().collect();
let payload_high: StringArray = payload_high.into_iter().collect();

RecordBatch::try_from_iter(vec![
("key", Arc::new(key) as _),
("payload_low", Arc::new(payload_low) as _),
("payload_high", Arc::new(payload_high) as _),
])
.unwrap()
})
}

/// Create a batch of (key, dictionary payloads) sorted only by key.
fn i64_key_dictionary_payload_streams(
sorted: bool,
input_size: u64,
) -> PartitionedBatches {
let mut data_gen = DataGenerator::new(input_size);
let keys = data_gen.i64_values_for_sort_input(sorted);
let mut tuples: Vec<_> = keys
.into_iter()
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.collect();

if sorted {
tuples.sort_unstable_by_key(|(((key, _), _), _)| *key);
}

split_tuples(tuples, |tuples| {
let (tuples, payload_c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (tuples, payload_b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (keys, payload_a): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();

let key: Int64Array = keys.into_iter().collect();
let payload_a: DictionaryArray<Int32Type> =
payload_a.iter().map(Option::as_deref).collect();
let payload_b: DictionaryArray<Int32Type> =
payload_b.iter().map(Option::as_deref).collect();
let payload_c: DictionaryArray<Int32Type> =
payload_c.iter().map(Option::as_deref).collect();

RecordBatch::try_from_iter(vec![
("key", Arc::new(key) as _),
("payload_a", Arc::new(payload_a) as _),
("payload_b", Arc::new(payload_b) as _),
("payload_c", Arc::new(payload_c) as _),
])
.unwrap()
})
}

/// Encapsulates creating data for this test
struct DataGenerator {
rng: StdRng,
Expand All @@ -644,10 +851,7 @@ impl DataGenerator {

/// Create an array of i64 sorted values (where approximately 1/3 values is repeated)
fn i64_values(&mut self) -> Vec<i64> {
let mut vec: Vec<_> = (0..self.input_size)
.map(|_| self.rng.random_range(0..self.input_size as i64))
.collect();

let mut vec = self.i64_unsorted_values();
vec.sort_unstable();

// 6287 distinct / 10000 total
Expand All @@ -656,6 +860,20 @@ impl DataGenerator {
vec
}

fn i64_values_for_sort_input(&mut self, sorted: bool) -> Vec<i64> {
if sorted {
self.i64_values()
} else {
self.i64_unsorted_values()
}
}

fn i64_unsorted_values(&mut self) -> Vec<i64> {
(0..self.input_size)
.map(|_| self.rng.random_range(0..self.input_size as i64))
.collect()
}

/// Create an array of f64 sorted values (with same distribution of `i64_values`)
fn f64_values(&mut self) -> Vec<f64> {
self.i64_values().into_iter().map(|v| v as f64).collect()
Expand Down
Loading