diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index f168bdcc6a69f..1120fd998ab3b 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -166,10 +166,6 @@ harness = false name = "jit" required-features = ["jit"] -[[bench]] -harness = false -name = "merge" - [[bench]] harness = false name = "sort" diff --git a/datafusion/core/benches/merge.rs b/datafusion/core/benches/merge.rs deleted file mode 100644 index f1c4736039f9c..0000000000000 --- a/datafusion/core/benches/merge.rs +++ /dev/null @@ -1,624 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Benchmarks for Merge performance -//! -//! Each benchmark: -//! 1. Creates a sorted RecordBatch of some number of columns -//! -//! 2. Divides that `RecordBatch` into some number of "streams" -//! (`RecordBatch`s with a subset of the rows, still ordered) -//! -//! 3. Times how long it takes for [`SortPreservingMergeExec`] to -//! merge the "streams" back together into the original RecordBatch. -//! -//! Pictorally: -//! -//! ``` -//! Rows are randombly -//! divided into separate -//! RecordBatch "streams", -//! ┌────┐ ┌────┐ ┌────┐ preserving the order ┌────┐ ┌────┐ ┌────┐ -//! │ │ │ │ │ │ │ │ │ │ │ │ -//! │ │ │ │ │ │ ──────────────┐ │ │ │ │ │ │ -//! │ │ │ │ │ │ └─────────────▶ │ C1 │ │... │ │ CN │ -//! │ │ │ │ │ │ ───────────────┐ │ │ │ │ │ │ -//! │ │ │ │ │ │ ┌┼─────────────▶ │ │ │ │ │ │ -//! │ │ │ │ │ │ ││ │ │ │ │ │ │ -//! │ │ │ │ │ │ ││ └────┘ └────┘ └────┘ -//! │ │ │ │ │ │ ││ ┌────┐ ┌────┐ ┌────┐ -//! │ │ │ │ │ │ │└───────────────▶│ │ │ │ │ │ -//! │ │ │ │ │ │ │ │ │ │ │ │ │ -//! │ │ │ │ │ │ ... │ │ C1 │ │... │ │ CN │ -//! │ │ │ │ │ │ ──────────────┘ │ │ │ │ │ │ -//! │ │ │ │ │ │ ┌──────────────▶ │ │ │ │ │ │ -//! │ C1 │ │... │ │ CN │ │ │ │ │ │ │ │ -//! │ │ │ │ │ │───────────────┐│ └────┘ └────┘ └────┘ -//! │ │ │ │ │ │ ││ -//! │ │ │ │ │ │ ││ -//! │ │ │ │ │ │ ││ ... -//! │ │ │ │ │ │ ────────────┼┼┐ -//! │ │ │ │ │ │ │││ -//! │ │ │ │ │ │ │││ ┌────┐ ┌────┐ ┌────┐ -//! │ │ │ │ │ │ ──────────────┼┘│ │ │ │ │ │ │ -//! │ │ │ │ │ │ │ │ │ │ │ │ │ │ -//! │ │ │ │ │ │ │ │ │ C1 │ │... │ │ CN │ -//! │ │ │ │ │ │ └─┼────────────▶ │ │ │ │ │ │ -//! │ │ │ │ │ │ │ │ │ │ │ │ │ -//! │ │ │ │ │ │ └─────────────▶ │ │ │ │ │ │ -//! └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ -//! Input RecordBatch NUM_STREAMS input -//! Columns 1..N RecordBatches -//! INPUT_SIZE sorted rows (still INPUT_SIZE total -//! ~10% duplicates rows) -//! ``` - -use std::sync::Arc; - -use arrow::array::DictionaryArray; -use arrow::datatypes::Int32Type; -use arrow::{ - array::{Float64Array, Int64Array, StringArray, UInt64Array}, - compute::{self, SortOptions, TakeOptions}, - datatypes::Schema, - record_batch::RecordBatch, -}; - -/// Benchmarks for SortPreservingMerge stream -use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion::physical_plan::sorts::sort::SortExec; -use datafusion::{ - execution::context::TaskContext, - physical_plan::{ - memory::MemoryExec, sorts::sort_preserving_merge::SortPreservingMergeExec, - ExecutionPlan, - }, - prelude::SessionContext, -}; -use datafusion_physical_expr::{expressions::col, PhysicalSortExpr}; -use futures::StreamExt; -use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; -use tokio::runtime::Runtime; - -use lazy_static::lazy_static; - -/// Total number of streams to divide each input into -/// models 8 partition plan (should it be 16??) -const NUM_STREAMS: u64 = 8; - -/// Total number of input rows to generate -const INPUT_SIZE: u64 = 100000; -// cases: - -// * physical sort expr (X, Y Z, NULLS FIRST, ASC) (not parameterized) -// -// streams of distinct values -// streams with 10% duplicated values (within each stream, and across streams) -// These cases are intended to model important usecases in TPCH -// parameters: -// -// Input schemas -lazy_static! { - static ref I64_STREAMS: Vec> = i64_streams(); - static ref F64_STREAMS: Vec> = f64_streams(); - - static ref UTF8_LOW_CARDINALITY_STREAMS: Vec> = utf8_low_cardinality_streams(); - static ref UTF8_HIGH_CARDINALITY_STREAMS: Vec> = utf8_high_cardinality_streams(); - - static ref DICTIONARY_STREAMS: Vec> = dictionary_streams(); - static ref DICTIONARY_TUPLE_STREAMS: Vec> = dictionary_tuple_streams(); - static ref MIXED_DICTIONARY_TUPLE_STREAMS: Vec> = mixed_dictionary_tuple_streams(); - // * (string(low), string(low), string(high)) -- tpch q1 + iox - static ref UTF8_TUPLE_STREAMS: Vec> = utf8_tuple_streams(); - // * (f64, string, string, int) -- tpch q2 - static ref MIXED_TUPLE_STREAMS: Vec> = mixed_tuple_streams(); - -} - -fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("merge i64", |b| { - let case = MergeBenchCase::new(&I64_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge i64 SortExec input", |b| { - let case = MergeBenchCase::new_with_sort_input(&I64_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge f64", |b| { - let case = MergeBenchCase::new(&F64_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("merge f64 SortExec input", |b| { - let case = MergeBenchCase::new_with_sort_input(&F64_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 low cardinality", |b| { - let case = MergeBenchCase::new(&UTF8_LOW_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 low cardinality SortExec", |b| { - let case = MergeBenchCase::new_with_sort_input(&UTF8_LOW_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 high cardinality", |b| { - let case = MergeBenchCase::new(&UTF8_HIGH_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 high cardinality SortExec input", |b| { - let case = MergeBenchCase::new_with_sort_input(&UTF8_HIGH_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 tuple", |b| { - let case = MergeBenchCase::new(&UTF8_TUPLE_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 tuple SortExec input", |b| { - let case = MergeBenchCase::new_with_sort_input(&UTF8_TUPLE_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 dictionary", |b| { - let case = MergeBenchCase::new(&DICTIONARY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 dictionary SortExec input", |b| { - let case = MergeBenchCase::new_with_sort_input(&DICTIONARY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 dictionary tuple", |b| { - let case = MergeBenchCase::new(&DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - - c.bench_function("merge utf8 dictionary tuple SortExec input", |b| { - let case = MergeBenchCase::new_with_sort_input(&DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - - c.bench_function("merge mixed utf8 dictionary tuple", |b| { - let case = MergeBenchCase::new(&MIXED_DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - - c.bench_function("merge mixed utf8 dictionary tuple SortExec input", |b| { - let case = MergeBenchCase::new_with_sort_input(&MIXED_DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - - c.bench_function("merge mixed tuple", |b| { - let case = MergeBenchCase::new(&MIXED_TUPLE_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("merge mixed tuple SortExec input", |b| { - let case = MergeBenchCase::new_with_sort_input(&MIXED_TUPLE_STREAMS); - - b.iter(move || case.run()) - }); -} - -/// Encapsulates running each test case -struct MergeBenchCase { - runtime: Runtime, - task_ctx: Arc, - - // The plan to run - plan: Arc, -} - -impl MergeBenchCase { - /// Prepare to run a benchmark that merges the specified - /// partitions (streams) together using all keyes - fn new(partitions: &[Vec]) -> Self { - let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - - let schema = partitions[0][0].schema(); - let sort = make_sort_exprs(schema.as_ref()); - - let projection = None; - let exec = MemoryExec::try_new(partitions, schema, projection).unwrap(); - let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); - - Self { - runtime, - task_ctx, - plan, - } - } - - fn new_with_sort_input(partitions: &[Vec]) -> Self { - let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - - let schema = partitions[0][0].schema(); - let sort = make_sort_exprs(schema.as_ref()); - - let projection = None; - let exec = Arc::new(MemoryExec::try_new(partitions, schema, projection).unwrap()); - let sort_exec = SortExec::try_new(sort.to_owned(), exec, None).unwrap(); - let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(sort_exec))); - - Self { - runtime, - task_ctx, - plan, - } - } - - /// runs the specified plan to completion, draining all input and - /// panic'ing on error - fn run(&self) { - let plan = Arc::clone(&self.plan); - let task_ctx = Arc::clone(&self.task_ctx); - - assert_eq!(plan.output_partitioning().partition_count(), 1); - - self.runtime.block_on(async move { - let mut stream = plan.execute(0, task_ctx).unwrap(); - while let Some(b) = stream.next().await { - b.expect("unexpected execution error"); - } - }) - } -} - -/// Make sort exprs for each column in `schema` -fn make_sort_exprs(schema: &Schema) -> Vec { - schema - .fields() - .iter() - .map(|f| PhysicalSortExpr { - expr: col(f.name(), schema).unwrap(), - options: SortOptions::default(), - }) - .collect() -} - -/// Create streams of int64 (where approximately 1/3 values is repeated) -fn i64_streams() -> Vec> { - let array: Int64Array = DataGenerator::new().i64_values().into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![("i64", Arc::new(array) as _)]).unwrap(); - - split_batch(batch) -} - -/// Create streams of f64 (where approximately 1/3 values are repeated) -/// with the same distribution as i64_streams -fn f64_streams() -> Vec> { - let array: Float64Array = DataGenerator::new().f64_values().into_iter().collect(); - let batch = RecordBatch::try_from_iter(vec![("f64", Arc::new(array) as _)]).unwrap(); - - split_batch(batch) -} - -/// Create streams of random low cardinality utf8 values -fn utf8_low_cardinality_streams() -> Vec> { - let array: StringArray = DataGenerator::new() - .utf8_low_cardinality_values() - .into_iter() - .collect(); - - let batch = - RecordBatch::try_from_iter(vec![("utf_low", Arc::new(array) as _)]).unwrap(); - - split_batch(batch) -} - -/// Create streams of high cardinality (~ no duplicates) utf8 values -fn utf8_high_cardinality_streams() -> Vec> { - let array: StringArray = DataGenerator::new() - .utf8_high_cardinality_values() - .into_iter() - .collect(); - - let batch = - RecordBatch::try_from_iter(vec![("utf_high", Arc::new(array) as _)]).unwrap(); - - split_batch(batch) -} - -/// Create a batch of (utf8_low, utf8_low, utf8_high) -fn utf8_tuple_streams() -> Vec> { - let mut gen = DataGenerator::new(); - - // need to sort by the combined key, so combine them together - let mut tuples: Vec<_> = gen - .utf8_low_cardinality_values() - .into_iter() - .zip(gen.utf8_low_cardinality_values().into_iter()) - .zip(gen.utf8_high_cardinality_values().into_iter()) - .collect(); - - tuples.sort_unstable(); - - let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let utf8_high: StringArray = utf8_high.into_iter().collect(); - let utf8_low1: StringArray = utf8_low1.into_iter().collect(); - let utf8_low2: StringArray = utf8_low2.into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("utf_low1", Arc::new(utf8_low1) as _), - ("utf_low2", Arc::new(utf8_low2) as _), - ("utf_high", Arc::new(utf8_high) as _), - ]) - .unwrap(); - - split_batch(batch) -} - -/// Create a batch of (f64, utf8_low, utf8_low, i64) -fn mixed_tuple_streams() -> Vec> { - let mut gen = DataGenerator::new(); - - // need to sort by the combined key, so combine them together - let mut tuples: Vec<_> = gen - .i64_values() - .into_iter() - .zip(gen.utf8_low_cardinality_values().into_iter()) - .zip(gen.utf8_low_cardinality_values().into_iter()) - .zip(gen.i64_values().into_iter()) - .collect(); - tuples.sort_unstable(); - - let (tuples, i64_values): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let f64_values: Float64Array = f64_values.into_iter().map(|v| v as f64).collect(); - let utf8_low1: StringArray = utf8_low1.into_iter().collect(); - let utf8_low2: StringArray = utf8_low2.into_iter().collect(); - let i64_values: Int64Array = i64_values.into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("f64", Arc::new(f64_values) as _), - ("utf_low1", Arc::new(utf8_low1) as _), - ("utf_low2", Arc::new(utf8_low2) as _), - ("i64", Arc::new(i64_values) as _), - ]) - .unwrap(); - - split_batch(batch) -} - -/// Create a batch of (utf8_dict) -fn dictionary_streams() -> Vec> { - let mut gen = DataGenerator::new(); - let values = gen.utf8_low_cardinality_values(); - let dictionary: DictionaryArray = - values.iter().map(Option::as_deref).collect(); - - let batch = - RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap(); - - split_batch(batch) -} - -/// Create a batch of (utf8_dict, utf8_dict, utf8_dict) -fn dictionary_tuple_streams() -> Vec> { - let mut gen = DataGenerator::new(); - let mut tuples: Vec<_> = gen - .utf8_low_cardinality_values() - .into_iter() - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.utf8_low_cardinality_values()) - .collect(); - tuples.sort_unstable(); - - let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); - let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); - let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("a", Arc::new(a) as _), - ("b", Arc::new(b) as _), - ("c", Arc::new(c) as _), - ]) - .unwrap(); - - split_batch(batch) -} - -/// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64) -fn mixed_dictionary_tuple_streams() -> Vec> { - let mut gen = DataGenerator::new(); - let mut tuples: Vec<_> = gen - .utf8_low_cardinality_values() - .into_iter() - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.i64_values()) - .collect(); - tuples.sort_unstable(); - - let (tuples, d): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); - let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); - let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); - let d: Int64Array = d.into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("a", Arc::new(a) as _), - ("b", Arc::new(b) as _), - ("c", Arc::new(c) as _), - ("d", Arc::new(d) as _), - ]) - .unwrap(); - - split_batch(batch) -} - -/// Encapsulates creating data for this test -struct DataGenerator { - rng: StdRng, -} - -impl DataGenerator { - fn new() -> Self { - Self { - rng: StdRng::seed_from_u64(42), - } - } - - /// Create an array of i64 sorted values (where approximately 1/3 values is repeated) - fn i64_values(&mut self) -> Vec { - let mut vec: Vec<_> = (0..INPUT_SIZE) - .map(|_| self.rng.gen_range(0..INPUT_SIZE as i64)) - .collect(); - - vec.sort_unstable(); - - // 6287 distinct / 10000 total - //let num_distinct = vec.iter().collect::>().len(); - //println!("{} distinct / {} total", num_distinct, vec.len()); - vec - } - - /// Create an array of f64 sorted values (with same distribution of `i64_values`) - fn f64_values(&mut self) -> Vec { - self.i64_values().into_iter().map(|v| v as f64).collect() - } - - /// array of low cardinality (100 distinct) values - fn utf8_low_cardinality_values(&mut self) -> Vec>> { - let strings = (0..100) - .map(|s| format!("value{s}").into()) - .collect::>(); - - // pick from the 100 strings randomly - let mut input = (0..INPUT_SIZE) - .map(|_| { - let idx = self.rng.gen_range(0..strings.len()); - let s = Arc::clone(&strings[idx]); - Some(s) - }) - .collect::>(); - - input.sort_unstable(); - input - } - - /// Create sorted values of high cardinality (~ no duplicates) utf8 values - fn utf8_high_cardinality_values(&mut self) -> Vec> { - // make random strings - let mut input = (0..INPUT_SIZE) - .map(|_| Some(self.random_string())) - .collect::>(); - - input.sort_unstable(); - input - } - - fn random_string(&mut self) -> String { - let rng = &mut self.rng; - rng.sample_iter(rand::distributions::Alphanumeric) - .filter(|c| c.is_ascii_alphabetic()) - .take(20) - .map(char::from) - .collect::() - } -} - -/// Splits the (sorted) `input_batch` randomly into `NUM_STREAMS` approximately evenly sorted streams -fn split_batch(input_batch: RecordBatch) -> Vec> { - // figure out which inputs go where - let mut rng = StdRng::seed_from_u64(1337); - - // randomly assign rows to streams - let stream_assignments = (0..input_batch.num_rows()) - .map(|_| rng.gen_range(0..NUM_STREAMS)) - .collect(); - - // split the inputs into streams - (0..NUM_STREAMS) - .map(|stream| { - // make a "stream" of 1 record batch - vec![take_columns(&input_batch, &stream_assignments, stream)] - }) - .collect::>() -} - -/// returns a record batch that contains all there values where -/// stream_assignment[i] = stream (aka this is the equivalent of -/// calling take(indicies) where indicies[i] == stream_index) -fn take_columns( - input_batch: &RecordBatch, - stream_assignments: &UInt64Array, - stream: u64, -) -> RecordBatch { - // find just the indicies needed from record batches to extract - let stream_indices: UInt64Array = stream_assignments - .iter() - .enumerate() - .filter_map(|(idx, stream_idx)| { - if stream_idx.unwrap() == stream { - Some(idx as u64) - } else { - None - } - }) - .collect(); - - let options = Some(TakeOptions { check_bounds: true }); - - // now, get the columns from each array - let new_columns = input_batch - .columns() - .iter() - .map(|array| compute::take(array, &stream_indices, options.clone()).unwrap()) - .collect(); - - RecordBatch::try_new(input_batch.schema(), new_columns).unwrap() -} - -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches); diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 0507a9308a289..907392bc3e341 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -15,24 +15,77 @@ // specific language governing permissions and limitations // under the License. -//! Adapted from merge benchmark. Primary difference is that the input data is not ordered. +//! Benchmarks for Merge and sort performance +//! +//! Each benchmark: +//! 1. Creates a list of tuples (sorted if necessary) +//! +//! 2. Divides those tuples across some number of streams of [`RecordBatch`] +//! preserving any ordering +//! +//! 3. Times how long it takes for a given sort plan to process the input +//! +//! Pictorially: +//! +//! ``` +//! Rows are randomly +//! divided into separate +//! RecordBatch "streams", +//! ┌────┐ ┌────┐ ┌────┐ preserving the order ┌────┐ ┌────┐ ┌────┐ +//! │ │ │ │ │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │ ──────────────┐ │ │ │ │ │ │ +//! │ │ │ │ │ │ └─────────────▶ │ C1 │ │... │ │ CN │ +//! │ │ │ │ │ │ ───────────────┐ │ │ │ │ │ │ +//! │ │ │ │ │ │ ┌┼─────────────▶ │ │ │ │ │ │ +//! │ │ │ │ │ │ ││ │ │ │ │ │ │ +//! │ │ │ │ │ │ ││ └────┘ └────┘ └────┘ +//! │ │ │ │ │ │ ││ ┌────┐ ┌────┐ ┌────┐ +//! │ │ │ │ │ │ │└───────────────▶│ │ │ │ │ │ +//! │ │ │ │ │ │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │ ... │ │ C1 │ │... │ │ CN │ +//! │ │ │ │ │ │ ──────────────┘ │ │ │ │ │ │ +//! │ │ │ │ │ │ ┌──────────────▶ │ │ │ │ │ │ +//! │ C1 │ │... │ │ CN │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │───────────────┐│ └────┘ └────┘ └────┘ +//! │ │ │ │ │ │ ││ +//! │ │ │ │ │ │ ││ +//! │ │ │ │ │ │ ││ ... +//! │ │ │ │ │ │ ────────────┼┼┐ +//! │ │ │ │ │ │ │││ +//! │ │ │ │ │ │ │││ ┌────┐ ┌────┐ ┌────┐ +//! │ │ │ │ │ │ ──────────────┼┘│ │ │ │ │ │ │ +//! │ │ │ │ │ │ │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │ │ │ │ C1 │ │... │ │ CN │ +//! │ │ │ │ │ │ └─┼────────────▶ │ │ │ │ │ │ +//! │ │ │ │ │ │ │ │ │ │ │ │ │ +//! │ │ │ │ │ │ └─────────────▶ │ │ │ │ │ │ +//! └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ +//! Input RecordBatch NUM_STREAMS input +//! Columns 1..N RecordBatches +//! INPUT_SIZE sorted rows (still INPUT_SIZE total +//! ~10% duplicates rows) +//! ``` + use std::sync::Arc; use arrow::array::DictionaryArray; use arrow::datatypes::Int32Type; use arrow::{ - array::{Float64Array, Int64Array, StringArray, UInt64Array}, - compute::{self, SortOptions, TakeOptions}, + array::{Float64Array, Int64Array, StringArray}, + compute::SortOptions, datatypes::Schema, record_batch::RecordBatch, }; -/// Benchmarks for SortExec +/// Benchmarks for SortPreservingMerge stream use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::{ execution::context::TaskContext, - physical_plan::{memory::MemoryExec, sorts::sort::SortExec, ExecutionPlan}, + physical_plan::{ + memory::MemoryExec, sorts::sort_preserving_merge::SortPreservingMergeExec, + ExecutionPlan, + }, prelude::SessionContext, }; use datafusion_physical_expr::{expressions::col, PhysicalSortExpr}; @@ -41,145 +94,62 @@ use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use tokio::runtime::Runtime; -use lazy_static::lazy_static; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; /// Total number of streams to divide each input into /// models 8 partition plan (should it be 16??) -const NUM_STREAMS: u64 = 8; +const NUM_STREAMS: usize = 8; + +/// The size of each batch within each stream +const BATCH_SIZE: usize = 1024; /// Total number of input rows to generate const INPUT_SIZE: u64 = 100000; -// cases: -// * physical sort expr (X, Y Z, NULLS FIRST, ASC) (not parameterized) -// -// streams of distinct values -// streams with 10% duplicated values (within each stream, and across streams) -// These cases are intended to model important usecases in TPCH -// parameters: -// -// Input schemas -lazy_static! { - static ref I64_STREAMS: Vec> = i64_streams(); - static ref F64_STREAMS: Vec> = f64_streams(); - - static ref UTF8_LOW_CARDINALITY_STREAMS: Vec> = utf8_low_cardinality_streams(); - static ref UTF8_HIGH_CARDINALITY_STREAMS: Vec> = utf8_high_cardinality_streams(); - - static ref DICTIONARY_STREAMS: Vec> = dictionary_streams(); - static ref DICTIONARY_TUPLE_STREAMS: Vec> = dictionary_tuple_streams(); - static ref MIXED_DICTIONARY_TUPLE_STREAMS: Vec> = mixed_dictionary_tuple_streams(); - // * (string(low), string(low), string(high)) -- tpch q1 + iox - static ref UTF8_TUPLE_STREAMS: Vec> = utf8_tuple_streams(); - // * (f64, string, string, int) -- tpch q2 - static ref MIXED_TUPLE_STREAMS: Vec> = mixed_tuple_streams(); - -} +type PartitionedBatches = Vec>; fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("sort i64", |b| { - let case = SortBenchCase::new(&I64_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort i64 preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&I64_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort f64", |b| { - let case = SortBenchCase::new(&F64_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort f64 preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&F64_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 low cardinality", |b| { - let case = SortBenchCase::new(&UTF8_LOW_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 low cardinality preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&UTF8_LOW_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 high cardinality", |b| { - let case = SortBenchCase::new(&UTF8_HIGH_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 high cardinality preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&UTF8_HIGH_CARDINALITY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 tuple", |b| { - let case = SortBenchCase::new(&UTF8_TUPLE_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 tuple preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&UTF8_TUPLE_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 dictionary", |b| { - let case = SortBenchCase::new(&DICTIONARY_STREAMS); - - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 dictionary preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&DICTIONARY_STREAMS); - - b.iter(move || case.run()) - }); - - c.bench_function("sort utf8 dictionary tuple", |b| { - let case = SortBenchCase::new(&DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - c.bench_function("sort utf8 dictionary tuple preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - - c.bench_function("sort mixed utf8 dictionary tuple", |b| { - let case = SortBenchCase::new(&MIXED_DICTIONARY_TUPLE_STREAMS); - b.iter(move || case.run()) - }); - - c.bench_function( - "sort mixed utf8 dictionary tuple preserve partitioning", - |b| { - let case = - SortBenchCasePreservePartitioning::new(&MIXED_DICTIONARY_TUPLE_STREAMS); + let cases: Vec<(&str, &dyn Fn(bool) -> PartitionedBatches)> = vec![ + ("i64", &i64_streams), + ("f64", &f64_streams), + ("utf8 low cardinality", &utf8_low_cardinality_streams), + ("utf8 high cardinality", &utf8_high_cardinality_streams), + ("utf8 tuple", &utf8_tuple_streams), + ("utf8 dictionary", &dictionary_streams), + ("utf8 dictionary tuple", &dictionary_tuple_streams), + ("mixed dictionary tuple", &mixed_dictionary_tuple_streams), + ("mixed tuple", &mixed_tuple_streams), + ]; + + for (name, f) in cases { + c.bench_function(&format!("merge sorted {name}"), |b| { + let data = f(true); + let case = BenchCase::merge_sorted(&data); b.iter(move || case.run()) - }, - ); + }); - c.bench_function("sort mixed tuple", |b| { - let case = SortBenchCase::new(&MIXED_TUPLE_STREAMS); + c.bench_function(&format!("sort merge {name}"), |b| { + let data = f(false); + let case = BenchCase::sort_merge(&data); + b.iter(move || case.run()) + }); - b.iter(move || case.run()) - }); - c.bench_function("sort mixed tuple preserve partitioning", |b| { - let case = SortBenchCasePreservePartitioning::new(&MIXED_TUPLE_STREAMS); + c.bench_function(&format!("sort {name}"), |b| { + let data = f(false); + let case = BenchCase::sort(&data); + b.iter(move || case.run()) + }); - b.iter(move || case.run()) - }); + c.bench_function(&format!("sort partitioned {name}"), |b| { + let data = f(false); + let case = BenchCase::sort_partitioned(&data); + b.iter(move || case.run()) + }); + } } -/// Encapsulates running a test case where input partitioning is not preserved. -struct SortBenchCase { +/// Encapsulates running each test case +struct BenchCase { runtime: Runtime, task_ctx: Arc, @@ -187,10 +157,10 @@ struct SortBenchCase { plan: Arc, } -impl SortBenchCase { +impl BenchCase { /// Prepare to run a benchmark that merges the specified - /// partitions (streams) together using all keyes - fn new(partitions: &[Vec]) -> Self { + /// pre-sorted partitions (streams) together using all keys + fn merge_sorted(partitions: &[Vec]) -> Self { let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -198,10 +168,8 @@ impl SortBenchCase { let schema = partitions[0][0].schema(); let sort = make_sort_exprs(schema.as_ref()); - let projection = None; - let exec = MemoryExec::try_new(partitions, schema, projection).unwrap(); - let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); - let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap()); + let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); Self { runtime, @@ -210,37 +178,51 @@ impl SortBenchCase { } } - /// runs the specified plan to completion, draining all input and - /// panic'ing on error - fn run(&self) { - let plan = Arc::clone(&self.plan); - let task_ctx = Arc::clone(&self.task_ctx); + /// Test SortExec in "partitioned" mode followed by a SortPreservingMerge + fn sort_merge(partitions: &[Vec]) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); - assert_eq!(plan.output_partitioning().partition_count(), 1); + let schema = partitions[0][0].schema(); + let sort = make_sort_exprs(schema.as_ref()); - self.runtime.block_on(async move { - let mut stream = plan.execute(0, task_ctx).unwrap(); - while let Some(b) = stream.next().await { - b.expect("unexpected execution error"); - } - }) + let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = + SortExec::new_with_partitioning(sort.clone(), Arc::new(exec), true, None); + let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); + + Self { + runtime, + task_ctx, + plan, + } } -} -/// Encapsulates running a test case where input partitioning is not preserved. -struct SortBenchCasePreservePartitioning { - runtime: Runtime, - task_ctx: Arc, - // The plan to run - plan: Arc, - partition_count: usize, -} + /// Test SortExec in "partitioned" mode which sorts the input streams + /// individually into some number of output streams + fn sort(partitions: &[Vec]) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); -impl SortBenchCasePreservePartitioning { - /// Prepare to run a benchmark that merges the specified - /// partitions (streams) together using all keyes - fn new(partitions: &[Vec]) -> Self { - let partition_count = partitions.len(); + let schema = partitions[0][0].schema(); + let sort = make_sort_exprs(schema.as_ref()); + + let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); + let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap()); + + Self { + runtime, + task_ctx, + plan, + } + } + + /// Test SortExec in "partitioned" mode which sorts the input streams + /// individually into some number of output streams + fn sort_partitioned(partitions: &[Vec]) -> Self { let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -248,20 +230,14 @@ impl SortBenchCasePreservePartitioning { let schema = partitions[0][0].schema(); let sort = make_sort_exprs(schema.as_ref()); - let projection = None; - let exec = MemoryExec::try_new(partitions, schema, projection).unwrap(); - let plan = Arc::new(SortExec::new_with_partitioning( - sort, - Arc::new(exec), - true, - None, - )); + let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = SortExec::new_with_partitioning(sort, Arc::new(exec), true, None); + let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); Self { runtime, task_ctx, plan, - partition_count, } } @@ -271,17 +247,12 @@ impl SortBenchCasePreservePartitioning { let plan = Arc::clone(&self.plan); let task_ctx = Arc::clone(&self.task_ctx); - assert_eq!( - plan.output_partitioning().partition_count(), - self.partition_count - ); + assert_eq!(plan.output_partitioning().partition_count(), 1); self.runtime.block_on(async move { - for i in 0..self.partition_count { - let mut stream = plan.execute(i, task_ctx.clone()).unwrap(); - while let Some(b) = stream.next().await { - b.expect("unexpected execution error"); - } + let mut stream = plan.execute(0, task_ctx).unwrap(); + while let Some(b) = stream.next().await { + b.expect("unexpected execution error"); } }) } @@ -300,51 +271,58 @@ fn make_sort_exprs(schema: &Schema) -> Vec { } /// Create streams of int64 (where approximately 1/3 values is repeated) -fn i64_streams() -> Vec> { - let array: Int64Array = DataGenerator::new().i64_values().into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![("i64", Arc::new(array) as _)]).unwrap(); +fn i64_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().i64_values(); + if sorted { + values.sort_unstable(); + } - split_batch(batch) + split_tuples(values, |v| { + let array = Int64Array::from(v); + RecordBatch::try_from_iter(vec![("i64", Arc::new(array) as _)]).unwrap() + }) } /// Create streams of f64 (where approximately 1/3 values are repeated) /// with the same distribution as i64_streams -fn f64_streams() -> Vec> { - let array: Float64Array = DataGenerator::new().f64_values().into_iter().collect(); - let batch = RecordBatch::try_from_iter(vec![("f64", Arc::new(array) as _)]).unwrap(); +fn f64_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().f64_values(); + if sorted { + values.sort_unstable_by(|a, b| a.total_cmp(b)); + } - split_batch(batch) + split_tuples(values, |v| { + let array = Float64Array::from(v); + RecordBatch::try_from_iter(vec![("f64", Arc::new(array) as _)]).unwrap() + }) } /// Create streams of random low cardinality utf8 values -fn utf8_low_cardinality_streams() -> Vec> { - let array: StringArray = DataGenerator::new() - .utf8_low_cardinality_values() - .into_iter() - .collect(); - - let batch = - RecordBatch::try_from_iter(vec![("utf_low", Arc::new(array) as _)]).unwrap(); - - split_batch(batch) +fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().utf8_low_cardinality_values(); + if sorted { + values.sort_unstable(); + } + split_tuples(values, |v| { + let array: StringArray = v.into_iter().collect(); + RecordBatch::try_from_iter(vec![("utf_low", Arc::new(array) as _)]).unwrap() + }) } /// Create streams of high cardinality (~ no duplicates) utf8 values -fn utf8_high_cardinality_streams() -> Vec> { - let array: StringArray = DataGenerator::new() - .utf8_high_cardinality_values() - .into_iter() - .collect(); - - let batch = - RecordBatch::try_from_iter(vec![("utf_high", Arc::new(array) as _)]).unwrap(); - - split_batch(batch) +fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().utf8_high_cardinality_values(); + if sorted { + values.sort_unstable(); + } + split_tuples(values, |v| { + let array: StringArray = v.into_iter().collect(); + RecordBatch::try_from_iter(vec![("utf_high", Arc::new(array) as _)]).unwrap() + }) } /// Create a batch of (utf8_low, utf8_low, utf8_high) -fn utf8_tuple_streams() -> Vec> { +fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); // need to sort by the combined key, so combine them together @@ -355,27 +333,29 @@ fn utf8_tuple_streams() -> Vec> { .zip(gen.utf8_high_cardinality_values().into_iter()) .collect(); - tuples.sort_unstable(); - - let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let utf8_high: StringArray = utf8_high.into_iter().collect(); - let utf8_low1: StringArray = utf8_low1.into_iter().collect(); - let utf8_low2: StringArray = utf8_low2.into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("utf_low1", Arc::new(utf8_low1) as _), - ("utf_low2", Arc::new(utf8_low2) as _), - ("utf_high", Arc::new(utf8_high) as _), - ]) - .unwrap(); + if sorted { + tuples.sort_unstable(); + } - split_batch(batch) + split_tuples(tuples, |tuples| { + let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let utf8_high: StringArray = utf8_high.into_iter().collect(); + let utf8_low1: StringArray = utf8_low1.into_iter().collect(); + let utf8_low2: StringArray = utf8_low2.into_iter().collect(); + + RecordBatch::try_from_iter(vec![ + ("utf_low1", Arc::new(utf8_low1) as _), + ("utf_low2", Arc::new(utf8_low2) as _), + ("utf_high", Arc::new(utf8_high) as _), + ]) + .unwrap() + }) } /// Create a batch of (f64, utf8_low, utf8_low, i64) -fn mixed_tuple_streams() -> Vec> { +fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); // need to sort by the combined key, so combine them together @@ -386,43 +366,50 @@ fn mixed_tuple_streams() -> Vec> { .zip(gen.utf8_low_cardinality_values().into_iter()) .zip(gen.i64_values().into_iter()) .collect(); - tuples.sort_unstable(); - - let (tuples, i64_values): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let f64_values: Float64Array = f64_values.into_iter().map(|v| v as f64).collect(); - let utf8_low1: StringArray = utf8_low1.into_iter().collect(); - let utf8_low2: StringArray = utf8_low2.into_iter().collect(); - let i64_values: Int64Array = i64_values.into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("f64", Arc::new(f64_values) as _), - ("utf_low1", Arc::new(utf8_low1) as _), - ("utf_low2", Arc::new(utf8_low2) as _), - ("i64", Arc::new(i64_values) as _), - ]) - .unwrap(); - - split_batch(batch) + + if sorted { + tuples.sort_unstable(); + } + + split_tuples(tuples, |tuples| { + let (tuples, i64_values): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let f64_values: Float64Array = f64_values.into_iter().map(|v| v as f64).collect(); + + let utf8_low1: StringArray = utf8_low1.into_iter().collect(); + let utf8_low2: StringArray = utf8_low2.into_iter().collect(); + let i64_values: Int64Array = i64_values.into_iter().collect(); + + RecordBatch::try_from_iter(vec![ + ("f64", Arc::new(f64_values) as _), + ("utf_low1", Arc::new(utf8_low1) as _), + ("utf_low2", Arc::new(utf8_low2) as _), + ("i64", Arc::new(i64_values) as _), + ]) + .unwrap() + }) } /// Create a batch of (utf8_dict) -fn dictionary_streams() -> Vec> { +fn dictionary_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); - let values = gen.utf8_low_cardinality_values(); - let dictionary: DictionaryArray = - values.iter().map(Option::as_deref).collect(); + let mut values = gen.utf8_low_cardinality_values(); + if sorted { + values.sort_unstable(); + } - let batch = - RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap(); + split_tuples(values, |v| { + let dictionary: DictionaryArray = + v.iter().map(Option::as_deref).collect(); - split_batch(batch) + RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap() + }) } /// Create a batch of (utf8_dict, utf8_dict, utf8_dict) -fn dictionary_tuple_streams() -> Vec> { +fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); let mut tuples: Vec<_> = gen .utf8_low_cardinality_values() @@ -430,27 +417,30 @@ fn dictionary_tuple_streams() -> Vec> { .zip(gen.utf8_low_cardinality_values()) .zip(gen.utf8_low_cardinality_values()) .collect(); - tuples.sort_unstable(); - let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); - let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); - let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("a", Arc::new(a) as _), - ("b", Arc::new(b) as _), - ("c", Arc::new(c) as _), - ]) - .unwrap(); + if sorted { + tuples.sort_unstable(); + } - split_batch(batch) + split_tuples(tuples, |tuples| { + let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); + let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); + let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); + + RecordBatch::try_from_iter(vec![ + ("a", Arc::new(a) as _), + ("b", Arc::new(b) as _), + ("c", Arc::new(c) as _), + ]) + .unwrap() + }) } /// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64) -fn mixed_dictionary_tuple_streams() -> Vec> { +fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); let mut tuples: Vec<_> = gen .utf8_low_cardinality_values() @@ -459,26 +449,29 @@ fn mixed_dictionary_tuple_streams() -> Vec> { .zip(gen.utf8_low_cardinality_values()) .zip(gen.i64_values()) .collect(); - tuples.sort_unstable(); - - let (tuples, d): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); - - let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); - let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); - let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); - let d: Int64Array = d.into_iter().collect(); - - let batch = RecordBatch::try_from_iter(vec![ - ("a", Arc::new(a) as _), - ("b", Arc::new(b) as _), - ("c", Arc::new(c) as _), - ("d", Arc::new(d) as _), - ]) - .unwrap(); - - split_batch(batch) + + if sorted { + tuples.sort_unstable(); + } + + split_tuples(tuples, |tuples| { + let (tuples, d): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); + let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); + let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); + let d: Int64Array = d.into_iter().collect(); + + RecordBatch::try_from_iter(vec![ + ("a", Arc::new(a) as _), + ("b", Arc::new(b) as _), + ("c", Arc::new(c) as _), + ("d", Arc::new(d) as _), + ]) + .unwrap() + }) } /// Encapsulates creating data for this test @@ -493,11 +486,18 @@ impl DataGenerator { } } - /// Create an array of i64 unsorted values (where approximately 1/3 values is repeated) + /// Create an array of i64 sorted values (where approximately 1/3 values is repeated) fn i64_values(&mut self) -> Vec { - (0..INPUT_SIZE) + let mut vec: Vec<_> = (0..INPUT_SIZE) .map(|_| self.rng.gen_range(0..INPUT_SIZE as i64)) - .collect() + .collect(); + + vec.sort_unstable(); + + // 6287 distinct / 10000 total + //let num_distinct = vec.iter().collect::>().len(); + //println!("{} distinct / {} total", num_distinct, vec.len()); + vec } /// Create an array of f64 sorted values (with same distribution of `i64_values`) @@ -512,21 +512,27 @@ impl DataGenerator { .collect::>(); // pick from the 100 strings randomly - (0..INPUT_SIZE) + let mut input = (0..INPUT_SIZE) .map(|_| { let idx = self.rng.gen_range(0..strings.len()); let s = Arc::clone(&strings[idx]); Some(s) }) - .collect::>() + .collect::>(); + + input.sort_unstable(); + input } - /// Create values of high cardinality (~ no duplicates) utf8 values + /// Create sorted values of high cardinality (~ no duplicates) utf8 values fn utf8_high_cardinality_values(&mut self) -> Vec> { // make random strings - (0..INPUT_SIZE) + let mut input = (0..INPUT_SIZE) .map(|_| Some(self.random_string())) - .collect::>() + .collect::>(); + + input.sort_unstable(); + input } fn random_string(&mut self) -> String { @@ -539,56 +545,36 @@ impl DataGenerator { } } -/// Splits the `input_batch` randomly into `NUM_STREAMS` approximately evenly sorted streams -fn split_batch(input_batch: RecordBatch) -> Vec> { +/// Splits the `input` tuples randomly into batches of `BATCH_SIZE` distributed across +/// `NUM_STREAMS` partitions, preserving any ordering +/// +/// `f` is function that takes a list of tuples and produces a [`RecordBatch`] +fn split_tuples(input: Vec, f: F) -> PartitionedBatches +where + F: Fn(Vec) -> RecordBatch, +{ // figure out which inputs go where let mut rng = StdRng::seed_from_u64(1337); - // randomly assign rows to streams - let stream_assignments = (0..input_batch.num_rows()) - .map(|_| rng.gen_range(0..NUM_STREAMS)) - .collect(); - - // split the inputs into streams - (0..NUM_STREAMS) - .map(|stream| { - // make a "stream" of 1 record batch - vec![take_columns(&input_batch, &stream_assignments, stream)] - }) - .collect::>() -} - -/// returns a record batch that contains all there values where -/// stream_assignment[i] = stream (aka this is the equivalent of -/// calling take(indicies) where indicies[i] == stream_index) -fn take_columns( - input_batch: &RecordBatch, - stream_assignments: &UInt64Array, - stream: u64, -) -> RecordBatch { - // find just the indicies needed from record batches to extract - let stream_indices: UInt64Array = stream_assignments - .iter() - .enumerate() - .filter_map(|(idx, stream_idx)| { - if stream_idx.unwrap() == stream { - Some(idx as u64) - } else { - None + let mut outputs: Vec>> = (0..NUM_STREAMS).map(|_| Vec::new()).collect(); + + for i in input { + let stream_idx = rng.gen_range(0..NUM_STREAMS); + let stream = &mut outputs[stream_idx]; + match stream.last_mut() { + Some(x) if x.len() < BATCH_SIZE => x.push(i), + _ => { + let mut v = Vec::with_capacity(BATCH_SIZE); + v.push(i); + stream.push(v) } - }) - .collect(); - - let options = Some(TakeOptions { check_bounds: true }); - - // now, get the columns from each array - let new_columns = input_batch - .columns() - .iter() - .map(|array| compute::take(array, &stream_indices, options.clone()).unwrap()) - .collect(); + } + } - RecordBatch::try_new(input_batch.schema(), new_columns).unwrap() + outputs + .into_iter() + .map(|stream| stream.into_iter().map(&f).collect()) + .collect() } criterion_group!(benches, criterion_benchmark);