From 343b82d9507f51ff5e112d9b163c9a68e28d2de7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 6 Apr 2023 14:41:05 +0100 Subject: [PATCH 1/2] Remove SortKeyCursor --- .../core/src/physical_plan/sorts/cursor.rs | 76 ++---- .../core/src/physical_plan/sorts/merge.rs | 29 ++- .../core/src/physical_plan/sorts/mod.rs | 1 - .../core/src/physical_plan/sorts/stream.rs | 22 +- datafusion/core/tests/sort_key_cursor.rs | 217 ------------------ 5 files changed, 47 insertions(+), 298 deletions(-) delete mode 100644 datafusion/core/tests/sort_key_cursor.rs diff --git a/datafusion/core/src/physical_plan/sorts/cursor.rs b/datafusion/core/src/physical_plan/sorts/cursor.rs index 8ab2acdda4c6..3507a5b224d2 100644 --- a/datafusion/core/src/physical_plan/sorts/cursor.rs +++ b/datafusion/core/src/physical_plan/sorts/cursor.rs @@ -18,24 +18,15 @@ use arrow::row::{Row, Rows}; use std::cmp::Ordering; -/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of -/// `PhysicalExpr` that when evaluated on the `RecordBatch` yield the sort keys. -/// -/// Additionally it maintains a row cursor that can be advanced through the rows -/// of the provided `RecordBatch` -/// -/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to -/// by this row cursor, with that of another `SortKeyCursor`. A cursor stores -/// a row comparator for each other cursor that it is compared to. -pub struct SortKeyCursor { - stream_idx: usize, +/// A [`Cursor`] for [`Rows`] +pub struct RowCursor { cur_row: usize, num_rows: usize, rows: Rows, } -impl std::fmt::Debug for SortKeyCursor { +impl std::fmt::Debug for RowCursor { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("SortKeyCursor") .field("cur_row", &self.cur_row) @@ -44,70 +35,39 @@ impl std::fmt::Debug for SortKeyCursor { } } -impl SortKeyCursor { +impl RowCursor { /// Create a new SortKeyCursor - pub fn new(stream_idx: usize, rows: Rows) -> Self { + pub fn new(rows: Rows) -> Self { Self { - stream_idx, cur_row: 0, num_rows: rows.num_rows(), rows, } } - #[inline(always)] - /// Return the stream index of this cursor - pub fn stream_idx(&self) -> usize { - self.stream_idx - } - - #[inline(always)] - /// Return true if the stream is finished - pub fn is_finished(&self) -> bool { - self.num_rows == self.cur_row - } - - #[inline(always)] - /// Returns the cursor's current row, and advances the cursor to the next row - pub fn advance(&mut self) -> usize { - assert!(!self.is_finished()); - let t = self.cur_row; - self.cur_row += 1; - t - } - /// Returns the current row fn current(&self) -> Row<'_> { self.rows.row(self.cur_row) } } -impl PartialEq for SortKeyCursor { +impl PartialEq for RowCursor { fn eq(&self, other: &Self) -> bool { self.current() == other.current() } } -impl Eq for SortKeyCursor {} +impl Eq for RowCursor {} -impl PartialOrd for SortKeyCursor { +impl PartialOrd for RowCursor { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for SortKeyCursor { +impl Ord for RowCursor { fn cmp(&self, other: &Self) -> Ordering { - // Order finished cursors greater (last) - match (self.is_finished(), other.is_finished()) { - (true, true) => Ordering::Equal, - (_, true) => Ordering::Less, - (true, _) => Ordering::Greater, - _ => self - .current() - .cmp(&other.current()) - .then_with(|| self.stream_idx.cmp(&other.stream_idx)), - } + self.current().cmp(&other.current()) } } @@ -117,17 +77,19 @@ pub trait Cursor: Ord { fn is_finished(&self) -> bool; /// Advance the cursor, returning the previous row index - /// - /// Returns `None` if [`Self::is_finished`] - fn advance(&mut self) -> Option; + fn advance(&mut self) -> usize; } -impl Cursor for SortKeyCursor { +impl Cursor for RowCursor { + #[inline] fn is_finished(&self) -> bool { - self.is_finished() + self.num_rows == self.cur_row } - fn advance(&mut self) -> Option { - (!self.is_finished()).then(|| self.advance()) + #[inline] + fn advance(&mut self) -> usize { + let t = self.cur_row; + self.cur_row += 1; + t } } diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index d6b43db82b73..69c3fb82c2d2 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -19,7 +19,7 @@ use crate::common::Result; use crate::physical_plan::metrics::MemTrackingMetrics; use crate::physical_plan::sorts::builder::BatchBuilder; use crate::physical_plan::sorts::cursor::Cursor; -use crate::physical_plan::sorts::stream::{PartitionedStream, SortKeyCursorStream}; +use crate::physical_plan::sorts::stream::{PartitionedStream, RowCursorStream}; use crate::physical_plan::{ PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream, }; @@ -37,7 +37,7 @@ pub(crate) fn streaming_merge( tracking_metrics: MemTrackingMetrics, batch_size: usize, ) -> Result { - let streams = SortKeyCursorStream::try_new(schema.as_ref(), expressions, streams)?; + let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?; Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), @@ -119,11 +119,7 @@ impl SortPreservingMergeStream { cx: &mut Context<'_>, idx: usize, ) -> Poll> { - if self.cursors[idx] - .as_ref() - .map(|cursor| !cursor.is_finished()) - .unwrap_or(false) - { + if self.cursors[idx].is_some() { // Cursor is not finished - don't need a new RecordBatch yet return Poll::Ready(Ok(())); } @@ -176,8 +172,7 @@ impl SortPreservingMergeStream { } let stream_idx = self.loser_tree[0]; - let cursor = self.cursors[stream_idx].as_mut(); - if let Some(row_idx) = cursor.and_then(Cursor::advance) { + if let Some(row_idx) = self.advance(stream_idx) { self.loser_tree_adjusted = false; self.in_progress.push_row(stream_idx, row_idx); if self.in_progress.len() < self.batch_size { @@ -189,13 +184,27 @@ impl SortPreservingMergeStream { } } + fn advance(&mut self, stream_idx: usize) -> Option { + let slot = &mut self.cursors[stream_idx]; + match slot.as_mut() { + Some(c) => { + let ret = c.advance(); + if c.is_finished() { + *slot = None; + } + Some(ret) + } + None => None, + } + } + /// Returns `true` if the cursor at index `a` is greater than at index `b` #[inline] fn is_gt(&self, a: usize, b: usize) -> bool { match (&self.cursors[a], &self.cursors[b]) { (None, _) => true, (_, None) => false, - (Some(a), Some(b)) => b < a, + (Some(ac), Some(bc)) => ac.cmp(bc).then_with(|| a.cmp(&b)).is_gt(), } } diff --git a/datafusion/core/src/physical_plan/sorts/mod.rs b/datafusion/core/src/physical_plan/sorts/mod.rs index cd5dae27dcc7..567de96c1cfd 100644 --- a/datafusion/core/src/physical_plan/sorts/mod.rs +++ b/datafusion/core/src/physical_plan/sorts/mod.rs @@ -25,6 +25,5 @@ pub mod sort; pub mod sort_preserving_merge; mod stream; -pub use cursor::SortKeyCursor; pub use index::RowIndex; pub(crate) use merge::streaming_merge; diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs b/datafusion/core/src/physical_plan/sorts/stream.rs index 1bc046042ee7..3fe68624f7c1 100644 --- a/datafusion/core/src/physical_plan/sorts/stream.rs +++ b/datafusion/core/src/physical_plan/sorts/stream.rs @@ -16,7 +16,7 @@ // under the License. use crate::common::Result; -use crate::physical_plan::sorts::cursor::SortKeyCursor; +use crate::physical_plan::sorts::cursor::RowCursor; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::Schema; @@ -73,9 +73,9 @@ impl FusedStreams { } /// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`] -/// and computes [`SortKeyCursor`] based on the provided [`PhysicalSortExpr`] +/// and computes [`RowCursor`] based on the provided [`PhysicalSortExpr`] #[derive(Debug)] -pub(crate) struct SortKeyCursorStream { +pub(crate) struct RowCursorStream { /// Converter to convert output of physical expressions converter: RowConverter, /// The physical expressions to sort by @@ -84,7 +84,7 @@ pub(crate) struct SortKeyCursorStream { streams: FusedStreams, } -impl SortKeyCursorStream { +impl RowCursorStream { pub(crate) fn try_new( schema: &Schema, expressions: &[PhysicalSortExpr], @@ -107,11 +107,7 @@ impl SortKeyCursorStream { }) } - fn convert_batch( - &mut self, - batch: &RecordBatch, - stream_idx: usize, - ) -> Result { + fn convert_batch(&mut self, batch: &RecordBatch) -> Result { let cols = self .column_expressions .iter() @@ -119,12 +115,12 @@ impl SortKeyCursorStream { .collect::>>()?; let rows = self.converter.convert_columns(&cols)?; - Ok(SortKeyCursor::new(stream_idx, rows)) + Ok(RowCursor::new(rows)) } } -impl PartitionedStream for SortKeyCursorStream { - type Output = Result<(SortKeyCursor, RecordBatch)>; +impl PartitionedStream for RowCursorStream { + type Output = Result<(RowCursor, RecordBatch)>; fn partitions(&self) -> usize { self.streams.0.len() @@ -137,7 +133,7 @@ impl PartitionedStream for SortKeyCursorStream { ) -> Poll> { Poll::Ready(ready!(self.streams.poll_next(cx, stream_idx)).map(|r| { r.and_then(|batch| { - let cursor = self.convert_batch(&batch, stream_idx)?; + let cursor = self.convert_batch(&batch)?; Ok((cursor, batch)) }) })) diff --git a/datafusion/core/tests/sort_key_cursor.rs b/datafusion/core/tests/sort_key_cursor.rs deleted file mode 100644 index 0ae641bfb6d7..000000000000 --- a/datafusion/core/tests/sort_key_cursor.rs +++ /dev/null @@ -1,217 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Contains tests for SortKeyCursor - -use std::{cmp::Ordering, sync::Arc}; - -use arrow::datatypes::DataType; -use arrow::row::{RowConverter, SortField}; -use arrow::{array::Int64Array, record_batch::RecordBatch}; -use datafusion::physical_plan::sorts::{RowIndex, SortKeyCursor}; - -#[test] -fn test_single_column() { - let mut converter = RowConverter::new(vec![SortField::new(DataType::Int64)]).unwrap(); - let batch1 = int64_batch(vec![Some(1), Some(2), Some(5), Some(6)]); - let batch2 = int64_batch(vec![Some(3), Some(4), Some(8), Some(9)]); - - let mut cursor1 = CursorBuilder::new(batch1) - .with_stream_idx(11) - .build(&mut converter); - - let mut cursor2 = CursorBuilder::new(batch2) - .with_stream_idx(22) - .build(&mut converter); - - let expected = vec![ - "11: (0, 0)", - "11: (0, 1)", - "22: (0, 0)", - "22: (0, 1)", - "11: (0, 2)", - "11: (0, 3)", - "22: (0, 2)", - "22: (0, 3)", - ]; - - assert_indexes(expected, run(&mut cursor1, &mut cursor2)); -} - -#[test] -fn test_stable_compare() { - let mut converter = RowConverter::new(vec![SortField::new(DataType::Int64)]).unwrap(); - // Validate ties are broken by the lower stream idx to ensure stable sort - let batch1 = int64_batch(vec![Some(3), Some(4)]); - let batch2 = int64_batch(vec![Some(3)]); - - let cursor1 = CursorBuilder::new(batch1) - // higher stream index - .with_stream_idx(33); - - let cursor2 = CursorBuilder::new(batch2) - // Lower stream index -- should always be first - .with_stream_idx(22); - - let expected = vec!["22: (0, 0)", "33: (0, 0)", "33: (0, 1)"]; - - // Output should be the same, regardless of order - assert_indexes( - &expected, - run( - &mut cursor1.clone().build(&mut converter), - &mut cursor2.clone().build(&mut converter), - ), - ); - assert_indexes( - &expected, - run( - &mut cursor2.build(&mut converter), - &mut cursor1.build(&mut converter), - ), - ); -} - -/// Runs the two cursors to completion, sorting them, and -/// returning the sorted order of rows that would have produced -fn run(cursor1: &mut SortKeyCursor, cursor2: &mut SortKeyCursor) -> Vec { - let mut indexes = vec![]; - loop { - println!( - "(cursor1.is_finished(), cursor2.is_finished()): ({}, {})", - cursor1.is_finished(), - cursor2.is_finished() - ); - - match (cursor1.is_finished(), cursor2.is_finished()) { - (true, true) => return indexes, - (true, false) => return drain(cursor2, indexes), - (false, true) => return drain(cursor1, indexes), - // both cursors have more rows - (false, false) => match cursor1.cmp(&cursor2) { - Ordering::Less => { - indexes.push(advance(cursor1)); - } - Ordering::Equal => { - indexes.push(advance(cursor1)); - indexes.push(advance(cursor2)); - } - Ordering::Greater => { - indexes.push(advance(cursor2)); - } - }, - } - } -} - -// Advance the cursor and return the RowIndex created -fn advance(cursor: &mut SortKeyCursor) -> RowIndex { - let row_idx = cursor.advance(); - RowIndex { - stream_idx: cursor.stream_idx(), - batch_idx: 0, - row_idx, - } -} - -// Drain remaining items in the cursor, appending result to indexes -fn drain(cursor: &mut SortKeyCursor, mut indexes: Vec) -> Vec { - while !cursor.is_finished() { - indexes.push(advance(cursor)) - } - indexes -} - -/// Return the values as an [`Int64Array`] single record batch, with -/// column "c1" -fn int64_batch(values: impl IntoIterator>) -> RecordBatch { - let array: Int64Array = values.into_iter().collect(); - RecordBatch::try_from_iter(vec![("c1", Arc::new(array) as _)]).unwrap() -} - -/// helper for creating cursors to test -#[derive(Debug, Clone)] -struct CursorBuilder { - batch: RecordBatch, - stream_idx: Option, -} - -impl CursorBuilder { - fn new(batch: RecordBatch) -> Self { - Self { - batch, - stream_idx: None, - } - } - - /// Set the stream index - fn with_stream_idx(mut self, stream_idx: usize) -> Self { - self.stream_idx = Some(stream_idx); - self - } - - fn build(self, converter: &mut RowConverter) -> SortKeyCursor { - let Self { batch, stream_idx } = self; - let rows = converter.convert_columns(batch.columns()).unwrap(); - SortKeyCursor::new(stream_idx.expect("stream idx not set"), rows) - } -} - -/// Compares [`RowIndex`]es with a vector of strings, the result of -/// pretty formatting the [`RowIndex`]es. -/// -/// Designed so that failure output can be directly copy/pasted -/// into the test code as expected results. -fn assert_indexes( - expected_indexes: impl IntoIterator>, - indexes: impl IntoIterator, -) { - let expected_lines: Vec<_> = expected_indexes - .into_iter() - .map(|s| s.as_ref().to_string()) - .collect(); - - let actual_lines = format_as_strings(indexes); - - assert_eq!( - expected_lines, actual_lines, - "\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n" - ); -} - -/// Formats an terator of RowIndexes into strings for comparisons -/// -/// ```text -/// stream: (batch, index) -/// ``` -/// -/// for example, -/// ```text -/// 1: (0, 2) -/// ``` -/// means "Stream 1, batch id 0, row index 2" -fn format_as_strings(indexes: impl IntoIterator) -> Vec { - indexes - .into_iter() - .map(|row_index| { - format!( - "{}: ({}, {})", - row_index.stream_idx, row_index.batch_idx, row_index.row_idx - ) - }) - .collect() -} From 5ba66d548d88b235b5a0051b920d5df91d73b7b0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 7 Apr 2023 15:28:32 +0100 Subject: [PATCH 2/2] Format --- datafusion/core/src/physical_plan/sorts/merge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index d3215f4fb5d9..502c37e64173 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -172,7 +172,7 @@ impl SortPreservingMergeStream { } let stream_idx = self.loser_tree[0]; - if self.advance(stream_idx) { + if self.advance(stream_idx) { self.loser_tree_adjusted = false; self.in_progress.push_row(stream_idx); if self.in_progress.len() < self.batch_size {