diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index f1a77741064e4..845de6213f4d3 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -95,3 +95,7 @@ harness = false [[bench]] name = "scalar" harness = false + +[[bench]] +name = "physical_plan" +harness = false \ No newline at end of file diff --git a/datafusion/benches/physical_plan.rs b/datafusion/benches/physical_plan.rs new file mode 100644 index 0000000000000..9222ae131b8ff --- /dev/null +++ b/datafusion/benches/physical_plan.rs @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +use criterion::{BatchSize, Criterion}; +extern crate arrow; +extern crate datafusion; + +use std::{iter::FromIterator, sync::Arc}; + +use arrow::{ + array::{ArrayRef, Int64Array, StringArray}, + record_batch::RecordBatch, +}; +use tokio::runtime::Runtime; + +use datafusion::physical_plan::{ + collect, + expressions::{col, PhysicalSortExpr}, + memory::MemoryExec, + sort_preserving_merge::SortPreservingMergeExec, +}; + +// Initialise the operator using the provided record batches and the sort key +// as inputs. All record batches must have the same schema. +fn sort_preserving_merge_operator(batches: Vec, sort: &[&str]) { + let schema = batches[0].schema(); + + let sort = sort + .iter() + .map(|name| PhysicalSortExpr { + expr: col(name, &schema).unwrap(), + options: Default::default(), + }) + .collect::>(); + + let exec = MemoryExec::try_new( + &batches.into_iter().map(|rb| vec![rb]).collect::>(), + schema, + None, + ) + .unwrap(); + let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 8192)); + + let rt = Runtime::new().unwrap(); + rt.block_on(collect(merge)).unwrap(); +} + +// Produces `n` record batches of row size `m`. Each record batch will have +// identical contents except for if the `batch_offset` is set. In that case the +// values for column "d" in each subsequent record batch will be offset in +// value. +// +// The `rows_per_key` value controls how many rows are generated per "key", +// which is defined as columns a, b and c. +fn batches( + n: usize, + m: usize, + rows_per_sort_key: usize, + batch_offset: usize, +) -> Vec { + let mut rbs = Vec::with_capacity(n); + let mut curr_batch_offset = 0; + + for _ in 0..n { + let mut col_a = Vec::with_capacity(m); + let mut col_b = Vec::with_capacity(m); + let mut col_c = Vec::with_capacity(m); + let mut col_d = Vec::with_capacity(m); + + let mut j = 0; + let mut current_rows_per_sort_key = 0; + + for i in 0..m { + if current_rows_per_sort_key == rows_per_sort_key { + current_rows_per_sort_key = 0; + j = i; + } + + col_a.push(Some(format!("a-{:?}", j))); + col_b.push(Some(format!("b-{:?}", j))); + col_c.push(Some(format!("c-{:?}", j))); + col_d.push(Some((i + curr_batch_offset) as i64)); + + current_rows_per_sort_key += 1; + } + + col_a.sort(); + col_b.sort(); + col_c.sort(); + + let col_a: ArrayRef = Arc::new(StringArray::from_iter(col_a)); + let col_b: ArrayRef = Arc::new(StringArray::from_iter(col_b)); + let col_c: ArrayRef = Arc::new(StringArray::from_iter(col_c)); + let col_d: ArrayRef = Arc::new(Int64Array::from(col_d)); + + let rb = RecordBatch::try_from_iter(vec![ + ("a", col_a), + ("b", col_b), + ("c", col_c), + ("d", col_d), + ]) + .unwrap(); + rbs.push(rb); + + curr_batch_offset += batch_offset; + } + + rbs +} + +fn criterion_benchmark(c: &mut Criterion) { + let small_batch = batches(1, 100, 10, 0).remove(0); + let large_batch = batches(1, 1000, 1, 0).remove(0); + + let benches = vec![ + // Two batches with identical rows. They will need to be merged together + // with one row from each batch being taken until both batches are + // drained. + ("interleave_batches", batches(2, 1000, 10, 1)), + // Two batches with a small overlapping region of rows for each unique + // sort key. + ("merge_batches_some_overlap_small", batches(2, 1000, 10, 5)), + // Two batches with a large overlapping region of rows for each unique + // sort key. + ( + "merge_batches_some_overlap_large", + batches(2, 1000, 250, 125), + ), + // Two batches with no overlapping region of rows for each unique + // sort key. For a given unique sort key all rows are drained from one + // batch, then all the rows for the same key from the second batch. + // This repeats until all rows are drained. There are a small number of + // rows (10) for each unique sort key. + ("merge_batches_no_overlap_small", batches(2, 1000, 10, 12)), + // As above but this time there are a larger number of rows (250) for + // each unique sort key - still no overlaps. + ("merge_batches_no_overlap_large", batches(2, 1000, 250, 252)), + // Merges two batches where one batch is significantly larger than the + // other. + ( + "merge_batches_small_into_large", + vec![large_batch, small_batch], + ), + ]; + + for (name, input) in benches { + c.bench_function(name, move |b| { + b.iter_batched( + || input.clone(), + |input| { + sort_preserving_merge_operator(input, &["a", "b", "c", "d"]); + }, + BatchSize::LargeInput, + ) + }); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 316f0509960dd..0949c3c6a8cf6 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -24,22 +24,23 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::array::{ArrayRef, MutableArrayData}; -use arrow::compute::SortOptions; +use arrow::{ + array::{make_array as make_arrow_array, ArrayRef, MutableArrayData}, + compute::SortOptions, + datatypes::SchemaRef, + error::{ArrowError, Result as ArrowResult}, + record_batch::RecordBatch, +}; use async_trait::async_trait; use futures::channel::mpsc; use futures::stream::FusedStream; use futures::{Stream, StreamExt}; -use crate::arrow::datatypes::SchemaRef; -use crate::arrow::error::ArrowError; -use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::common::spawn_execution; -use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, - RecordBatchStream, SendableRecordBatchStream, + common::spawn_execution, expressions::PhysicalSortExpr, DisplayFormatType, + Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, + SendableRecordBatchStream, }; /// Sort preserving merge execution plan @@ -425,19 +426,38 @@ impl SortPreservingMergeStream { self.in_progress.len(), ); - for row_index in &self.in_progress { - let buffer_idx = + if self.in_progress.is_empty() { + return make_arrow_array(array_data.freeze()); + } + + let first = &self.in_progress[0]; + let mut buffer_idx = + stream_to_buffer_idx[first.stream_idx] + first.cursor_idx; + let mut start_row_idx = first.row_idx; + let mut end_row_idx = start_row_idx + 1; + + for row_index in self.in_progress.iter().skip(1) { + let next_buffer_idx = stream_to_buffer_idx[row_index.stream_idx] + row_index.cursor_idx; - // TODO: Coalesce contiguous writes - array_data.extend( - buffer_idx, - row_index.row_idx, - row_index.row_idx + 1, - ); + if next_buffer_idx == buffer_idx && row_index.row_idx == end_row_idx { + // subsequent row in same batch + end_row_idx += 1; + continue; + } + + // emit current batch of rows for current buffer + array_data.extend(buffer_idx, start_row_idx, end_row_idx); + + // start new batch of rows + buffer_idx = next_buffer_idx; + start_row_idx = row_index.row_idx; + end_row_idx = start_row_idx + 1; } - arrow::array::make_array(array_data.freeze()) + // emit final batch of rows + array_data.extend(buffer_idx, start_row_idx, end_row_idx); + make_arrow_array(array_data.freeze()) }) .collect(); @@ -555,7 +575,54 @@ mod tests { use tokio_stream::StreamExt; #[tokio::test] - async fn test_merge() { + async fn test_merge_interleave() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("c"), + Some("e"), + Some("g"), + Some("j"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70, 90, 30])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("b"), + Some("d"), + Some("f"), + Some("h"), + Some("j"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + _test_merge( + b1, + b2, + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 1 | a | 1970-01-01 00:00:00.000000008 |", + "| 10 | b | 1970-01-01 00:00:00.000000004 |", + "| 2 | c | 1970-01-01 00:00:00.000000007 |", + "| 20 | d | 1970-01-01 00:00:00.000000006 |", + "| 7 | e | 1970-01-01 00:00:00.000000006 |", + "| 70 | f | 1970-01-01 00:00:00.000000002 |", + "| 9 | g | 1970-01-01 00:00:00.000000005 |", + "| 90 | h | 1970-01-01 00:00:00.000000002 |", + "| 30 | j | 1970-01-01 00:00:00.000000006 |", // input b2 before b1 + "| 3 | j | 1970-01-01 00:00:00.000000008 |", + "+----+---+-------------------------------+", + ], + ) + .await; + } + + #[tokio::test] + async fn test_merge_some_overlap() { let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ Some("a"), @@ -564,21 +631,92 @@ mod tests { Some("d"), Some("e"), ])); - let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 4])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let a: ArrayRef = Arc::new(Int32Array::from(vec![70, 90, 30, 100, 110])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("c"), + Some("d"), + Some("e"), + Some("f"), + Some("g"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6])); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + _test_merge( + b1, + b2, + &[ + "+-----+---+-------------------------------+", + "| a | b | c |", + "+-----+---+-------------------------------+", + "| 1 | a | 1970-01-01 00:00:00.000000008 |", + "| 2 | b | 1970-01-01 00:00:00.000000007 |", + "| 70 | c | 1970-01-01 00:00:00.000000004 |", + "| 7 | c | 1970-01-01 00:00:00.000000006 |", + "| 9 | d | 1970-01-01 00:00:00.000000005 |", + "| 90 | d | 1970-01-01 00:00:00.000000006 |", + "| 30 | e | 1970-01-01 00:00:00.000000002 |", + "| 3 | e | 1970-01-01 00:00:00.000000008 |", + "| 100 | f | 1970-01-01 00:00:00.000000002 |", + "| 110 | g | 1970-01-01 00:00:00.000000006 |", + "+-----+---+-------------------------------+", + ], + ) + .await; + } + + #[tokio::test] + async fn test_merge_no_overlap() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("a"), + Some("b"), + Some("c"), Some("d"), Some("e"), + ])); + let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70, 90, 30])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + Some("f"), Some("g"), Some("h"), Some("i"), + Some("j"), ])); let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6])); let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); - let schema = b1.schema(); + _test_merge( + b1, + b2, + &[ + "+----+---+-------------------------------+", + "| a | b | c |", + "+----+---+-------------------------------+", + "| 1 | a | 1970-01-01 00:00:00.000000008 |", + "| 2 | b | 1970-01-01 00:00:00.000000007 |", + "| 7 | c | 1970-01-01 00:00:00.000000006 |", + "| 9 | d | 1970-01-01 00:00:00.000000005 |", + "| 3 | e | 1970-01-01 00:00:00.000000008 |", + "| 10 | f | 1970-01-01 00:00:00.000000004 |", + "| 20 | g | 1970-01-01 00:00:00.000000006 |", + "| 70 | h | 1970-01-01 00:00:00.000000002 |", + "| 90 | i | 1970-01-01 00:00:00.000000002 |", + "| 30 | j | 1970-01-01 00:00:00.000000006 |", + "+----+---+-------------------------------+", + ], + ) + .await; + } + + async fn _test_merge(b1: RecordBatch, b2: RecordBatch, exp: &[&str]) { + let schema = b1.schema(); let sort = vec![ PhysicalSortExpr { expr: col("b", &schema).unwrap(), @@ -595,25 +733,7 @@ mod tests { let collected = collect(merge).await.unwrap(); assert_eq!(collected.len(), 1); - assert_batches_eq!( - &[ - "+---+---+-------------------------------+", - "| a | b | c |", - "+---+---+-------------------------------+", - "| 1 | a | 1970-01-01 00:00:00.000000008 |", - "| 2 | b | 1970-01-01 00:00:00.000000007 |", - "| 7 | c | 1970-01-01 00:00:00.000000006 |", - "| 1 | d | 1970-01-01 00:00:00.000000004 |", - "| 9 | d | 1970-01-01 00:00:00.000000005 |", - "| 3 | e | 1970-01-01 00:00:00.000000004 |", - "| 2 | e | 1970-01-01 00:00:00.000000006 |", - "| 3 | g | 1970-01-01 00:00:00.000000002 |", - "| 4 | h | 1970-01-01 00:00:00.000000002 |", - "| 5 | i | 1970-01-01 00:00:00.000000006 |", - "+---+---+-------------------------------+", - ], - collected.as_slice() - ); + assert_batches_eq!(exp, collected.as_slice()); } async fn sorted_merge(