From 59e16934d15e3e507be9b618dee42833ebb59203 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 3 Apr 2023 16:56:52 +0100 Subject: [PATCH] Use SortPreservingMerge for in memory sort --- .../core/src/physical_plan/sorts/sort.rs | 332 ++---------------- 1 file changed, 35 insertions(+), 297 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 6037bc5c8b626..2a4545ea65c2b 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -25,7 +25,7 @@ use crate::execution::memory_pool::{ human_readable_size, MemoryConsumer, MemoryReservation, }; use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; +use crate::physical_plan::common::{batch_byte_size, IPCWriter}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet, @@ -34,28 +34,27 @@ use crate::physical_plan::sorts::merge::streaming_merge; use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; use crate::physical_plan::{ DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, - RecordBatchStream, SendableRecordBatchStream, Statistics, + SendableRecordBatchStream, Statistics, }; use crate::prelude::SessionConfig; -use arrow::array::{make_array, Array, ArrayRef, MutableArrayData}; +use arrow::array::ArrayRef; pub use arrow::compute::SortOptions; -use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; +use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use datafusion_physical_expr::EquivalenceProperties; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use log::{debug, error}; use std::any::Any; -use std::cmp::{min, Ordering}; +use std::cmp::Ordering; use std::fmt; use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::task::{Context, Poll}; use tempfile::NamedTempFile; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task; @@ -71,7 +70,7 @@ use tokio::task; /// 3. when input is exhausted, merge all in memory batches and spills to get a total order. struct ExternalSorter { schema: SchemaRef, - in_mem_batches: Vec, + in_mem_batches: Vec, spills: Vec, /// Sort expressions expr: Vec, @@ -131,12 +130,13 @@ impl ExternalSorter { // NB timer records time taken on drop, so there are no // calls to `timer.done()` below. let _timer = tracking_metrics.elapsed_compute().timer(); - let partial = sort_batch(input, self.schema.clone(), &self.expr, self.fetch)?; + let sorted_batch = + sort_batch(input, self.schema.clone(), &self.expr, self.fetch)?; // The resulting batch might be smaller (or larger, see #3747) than the input // batch due to either a propagated limit or the re-construction of arrays. So // for being reliable, we need to reflect the memory usage of the partial batch. - let new_size = batch_byte_size(&partial.sorted_batch); + let new_size = batch_byte_size(&sorted_batch); match new_size.cmp(&size) { Ordering::Greater => { // We don't have to call try_grow here, since we have already used the @@ -154,7 +154,7 @@ impl ExternalSorter { } Ordering::Equal => {} } - self.in_mem_batches.push(partial); + self.in_mem_batches.push(sorted_batch); } Ok(()) } @@ -278,279 +278,38 @@ impl Debug for ExternalSorter { /// consume the non-empty `sorted_batches` and do in_mem_sort fn in_mem_partial_sort( - buffered_batches: &mut Vec, + buffered_batches: &mut Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], batch_size: usize, - tracking_metrics: MemTrackingMetrics, - fetch: Option, + mut tracking_metrics: MemTrackingMetrics, + _fetch: Option, ) -> Result { - assert_ne!(buffered_batches.len(), 0); - if buffered_batches.len() == 1 { - let result = buffered_batches.pop(); - Ok(Box::pin(SizedRecordBatchStream::new( - schema, - vec![Arc::new(result.unwrap().sorted_batch)], - tracking_metrics, - ))) - } else { - let (sorted_arrays, batches): (Vec>, Vec) = - buffered_batches - .drain(..) - .map(|b| { - let BatchWithSortArray { - sort_arrays, - sorted_batch: batch, - } = b; - (sort_arrays, batch) - }) - .unzip(); - - let sorted_iter = { - // NB timer records time taken on drop, so there are no - // calls to `timer.done()` below. - let _timer = tracking_metrics.elapsed_compute().timer(); - get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)? - }; - Ok(Box::pin(SortedSizedRecordBatchStream::new( - schema, - batches, - sorted_iter, - tracking_metrics, - ))) + if buffered_batches.len() < 2 { + let batches: Vec<_> = buffered_batches.drain(..).collect(); + return Ok(Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(batches.into_iter().map(move |batch| { + tracking_metrics.output_rows().add(batch.num_rows()); + Ok(batch) + })), + ))); } -} - -#[derive(Debug, Copy, Clone)] -struct CompositeIndex { - batch_idx: u32, - row_idx: u32, -} - -/// Get sorted iterator by sort concatenated `SortColumn`s -fn get_sorted_iter( - sort_arrays: &[Vec], - expr: &[PhysicalSortExpr], - batch_size: usize, - fetch: Option, -) -> Result { - let row_indices = sort_arrays - .iter() - .enumerate() - .flat_map(|(i, arrays)| { - (0..arrays[0].len()).map(move |r| CompositeIndex { - // since we original use UInt32Array to index the combined mono batch, - // component record batches won't overflow as well, - // use u32 here for space efficiency. - batch_idx: i as u32, - row_idx: r as u32, - }) - }) - .collect::>(); - let sort_columns = expr - .iter() - .enumerate() - .map(|(i, expr)| { - let columns_i = sort_arrays - .iter() - .map(|cs| cs[i].as_ref()) - .collect::>(); - Ok(SortColumn { - values: concat(columns_i.as_slice())?, - options: Some(expr.options), - }) + let streams = buffered_batches + .drain(..) + .map(|batch| { + Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::once(futures::future::ready(Ok(batch))), + )) as _ }) - .collect::>>()?; - let indices = lexsort_to_indices(&sort_columns, fetch)?; - - // Calculate composite index based on sorted indices - let row_indices = indices - .values() - .iter() - .map(|i| row_indices[*i as usize]) .collect(); - Ok(SortedIterator::new(row_indices, batch_size)) -} + // TODO: More accurate, dynamic memory accounting (#5885) + tracking_metrics.init_mem_used(batch_size); -struct SortedIterator { - /// Current logical position in the iterator - pos: usize, - /// Sorted composite index of where to find the rows in buffered batches - composite: Vec, - /// Maximum batch size to produce - batch_size: usize, -} - -impl SortedIterator { - fn new(composite: Vec, batch_size: usize) -> Self { - Self { - pos: 0, - composite, - batch_size, - } - } - - fn memory_size(&self) -> usize { - std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..]) - } -} - -impl Iterator for SortedIterator { - type Item = Vec; - - /// Emit a max of `batch_size` positions each time - fn next(&mut self) -> Option { - let length = self.composite.len(); - if self.pos >= length { - return None; - } - - let current_size = min(self.batch_size, length - self.pos); - - // Combine adjacent indexes from the same batch to make a slice, - // for more efficient `extend` later. - let mut last_batch_idx = self.composite[self.pos].batch_idx; - let mut indices_in_batch = Vec::with_capacity(current_size); - - let mut slices = vec![]; - for ci in &self.composite[self.pos..self.pos + current_size] { - if ci.batch_idx != last_batch_idx { - group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); - last_batch_idx = ci.batch_idx; - } - indices_in_batch.push(ci.row_idx); - } - - assert!( - !indices_in_batch.is_empty(), - "There should have at least one record in a sort output slice." - ); - group_indices(last_batch_idx, &mut indices_in_batch, &mut slices); - - self.pos += current_size; - Some(slices) - } -} - -/// Group continuous indices into a slice for better `extend` performance -fn group_indices( - batch_idx: u32, - positions: &mut Vec, - output: &mut Vec, -) { - positions.sort_unstable(); - let mut last_pos = 0; - let mut run_length = 0; - for pos in positions.iter() { - if run_length == 0 { - last_pos = *pos; - run_length = 1; - } else if *pos == last_pos + 1 { - run_length += 1; - last_pos = *pos; - } else { - output.push(CompositeSlice { - batch_idx, - start_row_idx: last_pos + 1 - run_length, - len: run_length as usize, - }); - last_pos = *pos; - run_length = 1; - } - } - assert!( - run_length > 0, - "There should have at least one record in a sort output slice." - ); - output.push(CompositeSlice { - batch_idx, - start_row_idx: last_pos + 1 - run_length, - len: run_length as usize, - }); - positions.clear() -} - -/// Stream of sorted record batches -struct SortedSizedRecordBatchStream { - schema: SchemaRef, - batches: Vec, - sorted_iter: SortedIterator, - num_cols: usize, - metrics: MemTrackingMetrics, -} - -impl SortedSizedRecordBatchStream { - /// new - pub fn new( - schema: SchemaRef, - batches: Vec, - sorted_iter: SortedIterator, - mut metrics: MemTrackingMetrics, - ) -> Self { - let size = batches.iter().map(batch_byte_size).sum::() - + sorted_iter.memory_size(); - metrics.init_mem_used(size); - let num_cols = batches[0].num_columns(); - SortedSizedRecordBatchStream { - schema, - batches, - sorted_iter, - num_cols, - metrics, - } - } -} - -impl Stream for SortedSizedRecordBatchStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - match self.sorted_iter.next() { - None => Poll::Ready(None), - Some(slices) => { - let num_rows = slices.iter().map(|s| s.len).sum(); - let output = (0..self.num_cols) - .map(|i| { - let arrays = self - .batches - .iter() - .map(|b| b.column(i).data()) - .collect::>(); - let mut mutable = MutableArrayData::new(arrays, false, num_rows); - for x in slices.iter() { - mutable.extend( - x.batch_idx as usize, - x.start_row_idx as usize, - x.start_row_idx as usize + x.len, - ); - } - make_array(mutable.freeze()) - }) - .collect::>(); - let batch = - RecordBatch::try_new(self.schema.clone(), output).map_err(Into::into); - let poll = Poll::Ready(Some(batch)); - self.metrics.record_poll(poll) - } - } - } -} - -struct CompositeSlice { - batch_idx: u32, - start_row_idx: u32, - len: usize, -} - -impl RecordBatchStream for SortedSizedRecordBatchStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } + streaming_merge(streams, schema, expressions, tracking_metrics, batch_size) } async fn spill_partial_sorted_stream( @@ -844,17 +603,12 @@ impl ExecutionPlan for SortExec { } } -struct BatchWithSortArray { - sort_arrays: Vec, - sorted_batch: RecordBatch, -} - fn sort_batch( batch: RecordBatch, schema: SchemaRef, expr: &[PhysicalSortExpr], fetch: Option, -) -> Result { +) -> Result { let sort_columns = expr .iter() .map(|e| e.evaluate_to_sort_column(&batch)) @@ -882,23 +636,7 @@ fn sort_batch( .collect::, ArrowError>>()?, )?; - let sort_arrays = sort_columns - .into_iter() - .map(|sc| { - Ok(take( - sc.values.as_ref(), - &indices, - Some(TakeOptions { - check_bounds: false, - }), - )?) - }) - .collect::>>()?; - - Ok(BatchWithSortArray { - sort_arrays, - sorted_batch, - }) + Ok(sorted_batch) } async fn do_sort(