From 700a53df22830597816ca7e665452c882aea03ee Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 15 Aug 2023 12:27:31 -0700 Subject: [PATCH 1/6] feat(7181): add slice() to Cursor trait --- datafusion/physical-plan/src/sorts/cursor.rs | 52 ++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index baa417649fb08..9cdb9eb6eb624 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp; use arrow::row::{Row, Rows}; use arrow_array::types::ByteArrayType; use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray}; +use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; use std::cmp::Ordering; @@ -99,6 +100,13 @@ pub trait Cursor: Ord { /// Advance the cursor, returning the previous row index fn advance(&mut self) -> usize; + + /// Slice the cursor at a given row index, returning a new cursor + /// + /// Returns an error if the slice is out of bounds, or memory is insufficient + fn slice(&self, offset: usize, length: usize) -> Result + where + Self: Sized; } impl Cursor for RowCursor { @@ -113,6 +121,14 @@ impl Cursor for RowCursor { self.cur_row += 1; t } + + #[inline] + fn slice(&self, offset: usize, length: usize) -> Result { + let rows = self.rows.slice(offset, length); + let mut reservation = self.reservation.new_empty(); + reservation.try_grow(rows.size())?; + Ok(Self::new(rows, reservation)) + } } /// An [`Array`] that can be converted into [`FieldValues`] @@ -131,6 +147,10 @@ pub trait FieldValues { fn compare(a: &Self::Value, b: &Self::Value) -> Ordering; fn value(&self, idx: usize) -> &Self::Value; + + fn slice(&self, offset: usize, length: usize) -> Result + where + Self: Sized; } impl FieldArray for PrimitiveArray { @@ -160,6 +180,14 @@ impl FieldValues for PrimitiveValues { fn value(&self, idx: usize) -> &Self::Value { &self.0[idx] } + + #[inline] + fn slice(&self, offset: usize, length: usize) -> Result { + if offset + length > self.len() { + return Err(DataFusionError::Internal("slice out of bounds".into())); + } + Ok(Self(self.0.slice(offset, length))) + } } impl FieldArray for GenericByteArray { @@ -191,6 +219,14 @@ impl FieldValues for GenericByteArray { fn value(&self, idx: usize) -> &Self::Value { self.value(idx) } + + #[inline] + fn slice(&self, offset: usize, length: usize) -> Result { + if offset + length > Array::len(self) { + return Err(DataFusionError::Internal("slice out of bounds".into())); + } + Ok(self.slice(offset, length)) + } } /// A cursor over sorted, nullable [`FieldValues`] @@ -275,6 +311,22 @@ impl Cursor for FieldCursor { self.offset += 1; t } + + fn slice(&self, offset: usize, length: usize) -> Result { + let FieldCursor { + values, + offset: _, + null_threshold, + options, + } = self; + + Ok(Self { + values: values.slice(offset, length)?, + offset: 0, + null_threshold: *null_threshold, + options: *options, + }) + } } #[cfg(test)] From 0c66196d039b61d5f849cac91d0a2c0ff3e9fec5 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 14 Sep 2023 20:42:27 -0700 Subject: [PATCH 2/6] fix(7181): have RowCursor slicing be within the a single arc-refed Rows --- datafusion/physical-plan/src/sorts/cursor.rs | 41 ++++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 9cdb9eb6eb624..7a367dc803e0d 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -23,14 +23,15 @@ use arrow_array::types::ByteArrayType; use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray}; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; -use std::cmp::Ordering; +use std::{cmp::Ordering, sync::Arc}; /// A [`Cursor`] for [`Rows`] pub struct RowCursor { cur_row: usize, - num_rows: usize, + row_offset: usize, + row_limit: usize, // exclusive [offset..limit] - rows: Rows, + rows: Arc, /// Tracks for the memory used by in the `Rows` of this /// cursor. Freed on drop @@ -42,7 +43,7 @@ impl std::fmt::Debug for RowCursor { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("SortKeyCursor") .field("cur_row", &self.cur_row) - .field("num_rows", &self.num_rows) + .field("num_rows", &self.num_rows()) .finish() } } @@ -61,8 +62,9 @@ impl RowCursor { ); Self { cur_row: 0, - num_rows: rows.num_rows(), - rows, + row_offset: 0, + row_limit: rows.num_rows(), + rows: Arc::new(rows), reservation, } } @@ -107,27 +109,38 @@ pub trait Cursor: Ord { fn slice(&self, offset: usize, length: usize) -> Result where Self: Sized; + + /// Returns the number of rows in this cursor + fn num_rows(&self) -> usize; } impl Cursor for RowCursor { #[inline] fn is_finished(&self) -> bool { - self.num_rows == self.cur_row + self.cur_row >= self.row_limit } #[inline] fn advance(&mut self) -> usize { let t = self.cur_row; self.cur_row += 1; - t + t - self.row_offset } #[inline] fn slice(&self, offset: usize, length: usize) -> Result { - let rows = self.rows.slice(offset, length); - let mut reservation = self.reservation.new_empty(); - reservation.try_grow(rows.size())?; - Ok(Self::new(rows, reservation)) + Ok(Self { + cur_row: self.row_offset + offset, + row_offset: self.row_offset + offset, + row_limit: self.row_offset + offset + length, + rows: self.rows.clone(), + reservation: self.reservation.new_empty(), // Arc cloning of Rows is cheap + }) + } + + #[inline] + fn num_rows(&self) -> usize { + self.row_limit - self.row_offset } } @@ -327,6 +340,10 @@ impl Cursor for FieldCursor { options: *options, }) } + + fn num_rows(&self) -> usize { + self.values.len() + } } #[cfg(test)] From 78e154cd2d487845c773a5ed641d58c19a738332 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 11 Oct 2023 17:46:19 -0400 Subject: [PATCH 3/6] test(7181): cursor slicing --- datafusion/physical-plan/src/sorts/cursor.rs | 52 ++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 7a367dc803e0d..7845568dad578 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -501,4 +501,56 @@ mod tests { b.advance(); assert_eq!(a.cmp(&b), Ordering::Less); } + + #[test] + fn test_slice_primitive() { + let options = SortOptions { + descending: false, + nulls_first: true, + }; + + let buffer = ScalarBuffer::from(vec![0, 1, 2]); + let mut cursor = new_primitive(options, buffer, 0); + + // from start + let sliced = cursor.slice(0, 1).expect("slice should be successful"); + assert_eq!(sliced.num_rows(), 1); + let expected = new_primitive(options, ScalarBuffer::from(vec![0]), 0); + assert_eq!( + sliced.cmp(&expected), + Ordering::Equal, + "should slice from start" + ); + + // with offset + let sliced = cursor.slice(1, 2).expect("slice should be successful"); + assert_eq!(sliced.num_rows(), 2); + let expected = new_primitive(options, ScalarBuffer::from(vec![1]), 0); + assert_eq!( + sliced.cmp(&expected), + Ordering::Equal, + "should slice with offset" + ); + + // cursor current position != start + cursor.advance(); + let sliced = cursor.slice(0, 1).expect("slice should be successful"); + assert_eq!(sliced.num_rows(), 1); + let expected = new_primitive(options, ScalarBuffer::from(vec![0]), 0); + assert_eq!( + sliced.cmp(&expected), + Ordering::Equal, + "should ignore current cursor position when sliced" + ); + + // out of bounds + assert!( + cursor.slice(0, 42).is_err(), + "should return err when slice is out of bounds" + ); + assert!( + cursor.slice(42, 1).is_err(), + "should return err when slice is out of bounds" + ); + } } From 6a946a5af782ddff6f47e1c2b9133abe93c6bb7f Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 12 Oct 2023 13:56:02 -0400 Subject: [PATCH 4/6] fix(7181): cursor slice should panic, not return result --- datafusion/physical-plan/src/sorts/cursor.rs | 70 +++++++++++--------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 7845568dad578..95a1a7a0a215a 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -21,7 +21,6 @@ use arrow::datatypes::ArrowNativeTypeOp; use arrow::row::{Row, Rows}; use arrow_array::types::ByteArrayType; use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray}; -use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; use std::{cmp::Ordering, sync::Arc}; @@ -105,8 +104,10 @@ pub trait Cursor: Ord { /// Slice the cursor at a given row index, returning a new cursor /// - /// Returns an error if the slice is out of bounds, or memory is insufficient - fn slice(&self, offset: usize, length: usize) -> Result + /// # Panics + /// + /// Panics if the slice is out of bounds, or memory is insufficient + fn slice(&self, offset: usize, length: usize) -> Self where Self: Sized; @@ -128,14 +129,14 @@ impl Cursor for RowCursor { } #[inline] - fn slice(&self, offset: usize, length: usize) -> Result { - Ok(Self { + fn slice(&self, offset: usize, length: usize) -> Self { + Self { cur_row: self.row_offset + offset, row_offset: self.row_offset + offset, row_limit: self.row_offset + offset + length, rows: self.rows.clone(), reservation: self.reservation.new_empty(), // Arc cloning of Rows is cheap - }) + } } #[inline] @@ -161,7 +162,7 @@ pub trait FieldValues { fn value(&self, idx: usize) -> &Self::Value; - fn slice(&self, offset: usize, length: usize) -> Result + fn slice(&self, offset: usize, length: usize) -> Self where Self: Sized; } @@ -195,11 +196,9 @@ impl FieldValues for PrimitiveValues { } #[inline] - fn slice(&self, offset: usize, length: usize) -> Result { - if offset + length > self.len() { - return Err(DataFusionError::Internal("slice out of bounds".into())); - } - Ok(Self(self.0.slice(offset, length))) + fn slice(&self, offset: usize, length: usize) -> Self { + assert!(offset + length <= self.len(), "cursor slice out of bounds"); + Self(self.0.slice(offset, length)) } } @@ -234,11 +233,12 @@ impl FieldValues for GenericByteArray { } #[inline] - fn slice(&self, offset: usize, length: usize) -> Result { - if offset + length > Array::len(self) { - return Err(DataFusionError::Internal("slice out of bounds".into())); - } - Ok(self.slice(offset, length)) + fn slice(&self, offset: usize, length: usize) -> Self { + assert!( + offset + length <= Array::len(self), + "cursor slice out of bounds" + ); + self.slice(offset, length) } } @@ -325,7 +325,7 @@ impl Cursor for FieldCursor { t } - fn slice(&self, offset: usize, length: usize) -> Result { + fn slice(&self, offset: usize, length: usize) -> Self { let FieldCursor { values, offset: _, @@ -333,12 +333,12 @@ impl Cursor for FieldCursor { options, } = self; - Ok(Self { - values: values.slice(offset, length)?, + Self { + values: values.slice(offset, length), offset: 0, null_threshold: *null_threshold, options: *options, - }) + } } fn num_rows(&self) -> usize { @@ -513,7 +513,7 @@ mod tests { let mut cursor = new_primitive(options, buffer, 0); // from start - let sliced = cursor.slice(0, 1).expect("slice should be successful"); + let sliced = cursor.slice(0, 1); assert_eq!(sliced.num_rows(), 1); let expected = new_primitive(options, ScalarBuffer::from(vec![0]), 0); assert_eq!( @@ -523,7 +523,7 @@ mod tests { ); // with offset - let sliced = cursor.slice(1, 2).expect("slice should be successful"); + let sliced = cursor.slice(1, 2); assert_eq!(sliced.num_rows(), 2); let expected = new_primitive(options, ScalarBuffer::from(vec![1]), 0); assert_eq!( @@ -534,7 +534,7 @@ mod tests { // cursor current position != start cursor.advance(); - let sliced = cursor.slice(0, 1).expect("slice should be successful"); + let sliced = cursor.slice(0, 1); assert_eq!(sliced.num_rows(), 1); let expected = new_primitive(options, ScalarBuffer::from(vec![0]), 0); assert_eq!( @@ -542,15 +542,19 @@ mod tests { Ordering::Equal, "should ignore current cursor position when sliced" ); + } - // out of bounds - assert!( - cursor.slice(0, 42).is_err(), - "should return err when slice is out of bounds" - ); - assert!( - cursor.slice(42, 1).is_err(), - "should return err when slice is out of bounds" - ); + #[test] + #[should_panic(expected = "cursor slice out of bounds")] + fn test_slice_panic_can_panic() { + let options = SortOptions { + descending: false, + nulls_first: true, + }; + + let buffer = ScalarBuffer::from(vec![0, 1, 2]); + let cursor = new_primitive(options, buffer, 0); + + cursor.slice(42, 1); } } From 62486120383941763c4c8ee6639f72069c776705 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 12 Oct 2023 21:50:50 -0400 Subject: [PATCH 5/6] fix(7181): handle nulls_first sort option, with cursor slicing --- datafusion/physical-plan/src/sorts/cursor.rs | 61 +++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 95a1a7a0a215a..b9d81fab004ed 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -336,7 +336,7 @@ impl Cursor for FieldCursor { Self { values: values.slice(offset, length), offset: 0, - null_threshold: *null_threshold, + null_threshold: null_threshold.checked_sub(offset).unwrap_or(0), options: *options, } } @@ -557,4 +557,63 @@ mod tests { cursor.slice(42, 1); } + + #[test] + fn test_slice_nulls_first() { + let options = SortOptions { + descending: false, + nulls_first: true, + }; + + let buffer = ScalarBuffer::from(vec![2, i32::MIN, 2]); + let mut a = new_primitive(options, buffer, 1); + let buffer = ScalarBuffer::from(vec![3, 2, i32::MIN, 2]); + let mut b = new_primitive(options, buffer, 1); + + // NULL == NULL + assert_eq!(a, b); + assert_eq!(a.cmp(&b), Ordering::Equal); + + // 2 > NULL + a = a.slice(1, 1); + assert_ne!(a, b); + assert_eq!(a.cmp(&b), Ordering::Greater); + + // 2 < 3 + b = b.slice(3, 1); + assert_ne!(a, b); + assert_eq!(a.cmp(&b), Ordering::Less); + } + + #[test] + fn test_slice_no_nulls_first() { + let options = SortOptions { + descending: false, + nulls_first: false, + }; + + let buffer = ScalarBuffer::from(vec![2, i32::MIN, 2]); + let mut a = new_primitive(options, buffer, 1); + let buffer = ScalarBuffer::from(vec![3, 2, i32::MIN, 2]); + let mut b = new_primitive(options, buffer, 1); + + // 2 < 3 + assert_ne!(a, b); + assert_eq!(a.cmp(&b), Ordering::Less); + + // 2 == 2 + b = b.slice(1, 3); + assert_eq!(a, b); + assert_eq!(a.cmp(&b), Ordering::Equal); + + // NULL < 2 + a = a.slice(1, 2); + assert_ne!(a, b); + assert_eq!(a.cmp(&b), Ordering::Less); + + // NULL == NULL + b = b.slice(1, 2); + assert_eq!(a, b); + assert_eq!(a.cmp(&b), Ordering::Equal); + } } From ccaf130e1ef949859356a0d7c6a57f2cc9d49c82 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 13 Oct 2023 19:12:52 -0700 Subject: [PATCH 6/6] fix(7181): proper define null mask in test case, and fix tests for nulls_first and nulls_last slicing --- datafusion/physical-plan/src/sorts/cursor.rs | 107 ++++++++++++++----- 1 file changed, 79 insertions(+), 28 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index b9d81fab004ed..bb62965a38993 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -333,10 +333,18 @@ impl Cursor for FieldCursor { options, } = self; + let null_threshold = match self.options.nulls_first { + true => null_threshold.saturating_sub(offset), + false => { + let shorter_len = self.values.len().saturating_sub(offset + length + 1); + null_threshold.saturating_sub(offset.saturating_sub(shorter_len)) + } + }; + Self { values: values.slice(offset, length), offset: 0, - null_threshold: null_threshold.checked_sub(offset).unwrap_or(0), + null_threshold, options: *options, } } @@ -368,6 +376,25 @@ mod tests { } } + #[test] + fn test_primitive_null_mask() { + let options = SortOptions { + descending: false, + nulls_first: true, + }; + + let is_min = new_primitive(options, ScalarBuffer::from(vec![i32::MIN]), 0); + assert_eq!(is_min.num_rows(), 1); + let is_null = new_primitive(options, ScalarBuffer::from(vec![i32::MIN]), 1); + assert_eq!(is_null.num_rows(), 1); + + // i32::MIN != NULL + assert_ne!(is_min.cmp(&is_null), Ordering::Equal); // is null mask + + assert!(is_null.is_null()); + assert!(!is_min.is_null()); + } + #[test] fn test_primitive_nulls_first() { let options = SortOptions { @@ -414,6 +441,11 @@ mod tests { a.advance(); assert_eq!(a.cmp(&b), Ordering::Less); + // finished + assert!(!b.is_finished()); + b.advance(); + assert!(b.is_finished()); + let options = SortOptions { descending: false, nulls_first: false, @@ -440,6 +472,12 @@ mod tests { assert_eq!(a.cmp(&b), Ordering::Equal); assert_eq!(a, b); + // finished + assert!(!a.is_finished()); + a.advance(); + a.advance(); + assert!(a.is_finished()); + let options = SortOptions { descending: true, nulls_first: false, @@ -565,55 +603,68 @@ mod tests { nulls_first: true, }; - let buffer = ScalarBuffer::from(vec![2, i32::MIN, 2]); - let mut a = new_primitive(options, buffer, 1); - let buffer = ScalarBuffer::from(vec![3, 2, i32::MIN, 2]); - let mut b = new_primitive(options, buffer, 1); + let is_min = new_primitive(options, ScalarBuffer::from(vec![i32::MIN]), 0); + + let buffer = ScalarBuffer::from(vec![i32::MIN, 79, 2, i32::MIN]); + let mut a = new_primitive(options, buffer, 2); + assert_eq!(a.num_rows(), 4); + let buffer = ScalarBuffer::from(vec![i32::MIN, -284, 3, i32::MIN, 2]); + let mut b = new_primitive(options, buffer, 2); + assert_eq!(b.num_rows(), 5); // NULL == NULL - assert_eq!(a, b); + assert!(a.is_null()); assert_eq!(a.cmp(&b), Ordering::Equal); - // 2 > NULL - a = a.slice(1, 1); - assert_ne!(a, b); + // i32::MIN > NULL + a = a.slice(3, 1); + assert_eq!(a, is_min); assert_eq!(a.cmp(&b), Ordering::Greater); - // 2 < 3 - b = b.slice(3, 1); - assert_ne!(a, b); + // i32::MIN == i32::MIN + b = b.slice(3, 2); + assert_eq!(b, is_min); + assert_eq!(a.cmp(&b), Ordering::Equal); + + // i32::MIN < 2 + b = b.slice(1, 1); assert_eq!(a.cmp(&b), Ordering::Less); } #[test] - fn test_slice_no_nulls_first() { + fn test_slice_nulls_last() { let options = SortOptions { descending: false, nulls_first: false, }; - let buffer = ScalarBuffer::from(vec![2, i32::MIN, 2]); - let mut a = new_primitive(options, buffer, 1); - let buffer = ScalarBuffer::from(vec![3, 2, i32::MIN, 2]); - let mut b = new_primitive(options, buffer, 1); + let is_min = new_primitive(options, ScalarBuffer::from(vec![i32::MIN]), 0); - // 2 < 3 - assert_ne!(a, b); - assert_eq!(a.cmp(&b), Ordering::Less); + let buffer = ScalarBuffer::from(vec![i32::MIN, 79, 2, i32::MIN]); + let mut a = new_primitive(options, buffer, 2); + assert_eq!(a.num_rows(), 4); + let buffer = ScalarBuffer::from(vec![i32::MIN, -284, 3, i32::MIN, 2]); + let mut b = new_primitive(options, buffer, 2); + assert_eq!(b.num_rows(), 5); - // 2 == 2 - b = b.slice(1, 3); - assert_eq!(a, b); + // i32::MIN == i32::MIN + assert_eq!(a, is_min); assert_eq!(a.cmp(&b), Ordering::Equal); - // NULL < 2 - a = a.slice(1, 2); - assert_ne!(a, b); + // i32::MIN < -284 + b = b.slice(1, 3); // slice to full length assert_eq!(a.cmp(&b), Ordering::Less); + // 79 > -284 + a = a.slice(1, 2); // slice to shorter than full length + assert!(!a.is_null()); + assert_eq!(a.cmp(&b), Ordering::Greater); + // NULL == NULL - b = b.slice(1, 2); - assert_eq!(a, b); + a = a.slice(1, 1); + b = b.slice(2, 1); + assert!(a.is_null()); + assert!(b.is_null()); assert_eq!(a.cmp(&b), Ordering::Equal); } }