From ca033e09ba15ff66e38c9821c52e1a6c61c76497 Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 8 Oct 2024 14:35:27 +0800 Subject: [PATCH 01/14] define `ByteGroupValueViewBuilder`. --- .../aggregates/group_values/group_column.rs | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 5d00f300e960c..4092809b4fb6f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -20,12 +20,20 @@ use arrow::array::GenericBinaryArray; use arrow::array::GenericStringArray; use arrow::array::OffsetSizeTrait; use arrow::array::PrimitiveArray; +use arrow::array::PrimitiveBuilder; +use arrow::array::StringBuilder; +use arrow::array::StringViewBuilder; use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}; use arrow::buffer::OffsetBuffer; use arrow::buffer::ScalarBuffer; use arrow::datatypes::ByteArrayType; +use arrow::datatypes::ByteViewType; use arrow::datatypes::DataType; use arrow::datatypes::GenericBinaryType; +use arrow_array::BinaryViewArray; +use arrow_array::GenericByteViewArray; +use arrow_array::StringViewArray; +use arrow_buffer::Buffer; use datafusion_common::utils::proxy::VecAllocExt; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; @@ -376,6 +384,64 @@ where } } +/// An implementation of [`GroupColumn`] for binary view and utf8 view types. +/// +/// Stores a collection of binary view or utf8 view group values in a buffer +/// whose structure is similar to `GenericByteViewArray`, and we can get benefits: +/// +/// 1. Efficient comparison of incoming rows to existing rows +/// 2. Efficient construction of the final output array +/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder` +pub struct ByteGroupValueViewBuilder { + output_type: OutputType, + + /// The views of string values + /// + /// If string len <= 12, the view's format will be: + /// string(12B) | len(4B) + /// + /// If string len > 12, its format will be: + /// offset(4B) | buffer_index(4B) | prefix(4B) | len(4B) + views: Vec, + + /// The progressing block + /// + /// New values will be inserted into it until its capacity + /// is not enough(detail can see `max_block_size`). + in_progress: Vec, + + /// The completed blocks + completed: Vec, + + /// The max size of `in_progress` + /// + /// `in_progress` will be flushed into `completed`, and create new `in_progress` + /// when found its remaining capacity(`max_block_size` - `len(in_progress)`), + /// is no enough to store the appended value. + max_block_size: usize, + + /// Nulls + nulls: MaybeNullBufferBuilder, +} + +// impl ByteGroupValueViewBuilder { +// fn append_val_inner(&mut self, array: &ArrayRef, row: usize) +// where +// B: ByteViewType, +// { +// let arr = array.as_byte_view::(); +// if arr.is_null(row) { +// self.nulls.append(true); +// self.views.push(0); +// } else { +// self.nulls.append(false); +// let value: &[u8] = arr.value(row).as_ref(); +// self.buffer.append_slice(value); +// self.offsets.push(O::usize_as(self.buffer.len())); +// } +// } +// } + /// Determines if the nullability of the existing and new input array can be used /// to short-circuit the comparison of the two values. /// From ffcc1a25a427999b36c72bc6a9d28521aad3643b Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 8 Oct 2024 17:07:06 +0800 Subject: [PATCH 02/14] impl append. --- .../aggregates/group_values/group_column.rs | 71 ++++++++++++++----- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 4092809b4fb6f..e39afa5ba7396 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::make_view; use arrow::array::BufferBuilder; use arrow::array::GenericBinaryArray; use arrow::array::GenericStringArray; @@ -39,6 +40,7 @@ use datafusion_common::utils::proxy::VecAllocExt; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow_array::types::GenericStringType; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; +use std::mem; use std::sync::Arc; use std::vec; @@ -424,23 +426,58 @@ pub struct ByteGroupValueViewBuilder { nulls: MaybeNullBufferBuilder, } -// impl ByteGroupValueViewBuilder { -// fn append_val_inner(&mut self, array: &ArrayRef, row: usize) -// where -// B: ByteViewType, -// { -// let arr = array.as_byte_view::(); -// if arr.is_null(row) { -// self.nulls.append(true); -// self.views.push(0); -// } else { -// self.nulls.append(false); -// let value: &[u8] = arr.value(row).as_ref(); -// self.buffer.append_slice(value); -// self.offsets.push(O::usize_as(self.buffer.len())); -// } -// } -// } +impl ByteGroupValueViewBuilder { + fn append_val_inner(&mut self, array: &ArrayRef, row: usize) + where + B: ByteViewType, + { + let arr = array.as_byte_view::(); + + // If a null row, set and return + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + return; + } + + // Not null case + self.nulls.append(false); + let value: &[u8] = arr.value(row).as_ref(); + + let value_len = value.len(); + let view = if value_len > 12 { + // Ensure big enough block to hold the value firstly + self.ensure_in_progress_big_enough(value_len); + + // Append value + let block_id = self.completed.len(); + let offset = self.in_progress.len(); + self.in_progress.extend_from_slice(value); + + make_view(value, block_id, offset) + } else { + make_view(value, 0, 0) + }; + + // Append view + self.views.push(view); + } + + fn ensure_in_progress_big_enough(&mut self, value_len: usize) { + debug_assert!(value_len > 12); + let require_cap = self.in_progress.len() + value_len; + + // If current block isn't big enough, flush it and create a new in progress block + if require_cap > self.max_block_size { + let flushed_block = mem::replace( + &mut self.in_progress, + Vec::with_capacity(self.max_block_size), + ); + let buffer = Buffer::from_vec(flushed_block); + self.completed.push(buffer); + } + } +} /// Determines if the nullability of the existing and new input array can be used /// to short-circuit the comparison of the two values. From 48429654c9d3a2278b680168cd767a68ffe00ce9 Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 8 Oct 2024 19:01:46 +0800 Subject: [PATCH 03/14] impl equal to. --- .../aggregates/group_values/group_column.rs | 84 ++++++++++++++++++- 1 file changed, 81 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index e39afa5ba7396..a4361cd1ab203 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -17,6 +17,7 @@ use arrow::array::make_view; use arrow::array::BufferBuilder; +use arrow::array::ByteView; use arrow::array::GenericBinaryArray; use arrow::array::GenericStringArray; use arrow::array::OffsetSizeTrait; @@ -445,7 +446,9 @@ impl ByteGroupValueViewBuilder { let value: &[u8] = arr.value(row).as_ref(); let value_len = value.len(); - let view = if value_len > 12 { + let view = if value_len <= 12 { + make_view(value, 0, 0) + } else { // Ensure big enough block to hold the value firstly self.ensure_in_progress_big_enough(value_len); @@ -455,8 +458,6 @@ impl ByteGroupValueViewBuilder { self.in_progress.extend_from_slice(value); make_view(value, block_id, offset) - } else { - make_view(value, 0, 0) }; // Append view @@ -477,6 +478,83 @@ impl ByteGroupValueViewBuilder { self.completed.push(buffer); } } + + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool + where + B: ByteViewType, + { + let array = array.as_byte_view::(); + + // Check if nulls equal firstly + let exist_null = self.nulls.is_null(lhs_row); + let input_null = array.is_null(rhs_row); + if let Some(result) = nulls_equal_to(exist_null, input_null) { + return result; + } + + // Otherwise, we need to check their values + let exist_view = self.views[lhs_row]; + let exist_view_len = exist_view as u32; + + let input_view = array.views()[rhs_row]; + let input_view_len = input_view as u32; + + // The check logic + // - Check len equality + // - If non-inlined, check prefix and then check value in buffer + // when needed + // - If inlined, check inlined value + if exist_view_len != input_view_len { + return false; + } + + if exist_view_len <= 12 { + let exist_inline = unsafe { + GenericByteViewArray::::inline_value( + &exist_view, + exist_view_len as usize, + ) + }; + let input_inline = unsafe { + GenericByteViewArray::::inline_value( + &input_view, + input_view_len as usize, + ) + }; + exist_inline == input_inline + } else { + let exist_prefix = + unsafe { GenericByteViewArray::::inline_value(&exist_view, 4) }; + let input_prefix = + unsafe { GenericByteViewArray::::inline_value(&input_view, 4) }; + + if exist_prefix != input_prefix { + return false; + } + + let exist_full = { + let byte_view = ByteView::from(exist_view); + self.value( + byte_view.buffer_index as usize, + byte_view.offset as usize, + byte_view.length as usize, + ) + }; + let input_full: &[u8] = unsafe { array.value_unchecked(rhs_row).as_ref() }; + exist_full == input_full + } + } + + fn value(&self, buffer_index: usize, offset: usize, length: usize) -> &[u8] { + debug_assert!(buffer_index <= self.completed.len()); + + if buffer_index < self.completed.len() { + let block = &self.completed[buffer_index]; + &block[offset..offset + length] + } else { + &self.in_progress[offset..offset + length] + } + } } /// Determines if the nullability of the existing and new input array can be used From 66bb7be75b1ec8952e6b94535bc10e9af6623f9f Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 10 Oct 2024 01:52:09 +0800 Subject: [PATCH 04/14] fix compile. --- .../src/aggregates/group_values/group_column.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index a4361cd1ab203..61894ffe4a051 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -453,11 +453,11 @@ impl ByteGroupValueViewBuilder { self.ensure_in_progress_big_enough(value_len); // Append value - let block_id = self.completed.len(); + let buffer_index = self.completed.len(); let offset = self.in_progress.len(); self.in_progress.extend_from_slice(value); - make_view(value, block_id, offset) + make_view(value, buffer_index as u32, offset as u32) }; // Append view @@ -510,13 +510,13 @@ impl ByteGroupValueViewBuilder { if exist_view_len <= 12 { let exist_inline = unsafe { - GenericByteViewArray::::inline_value( + GenericByteViewArray::::inline_value( &exist_view, exist_view_len as usize, ) }; let input_inline = unsafe { - GenericByteViewArray::::inline_value( + GenericByteViewArray::::inline_value( &input_view, input_view_len as usize, ) From ef1efcecf682bb4e39b6f9f91555075650572ea9 Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 10 Oct 2024 02:13:06 +0800 Subject: [PATCH 05/14] fix comments. --- .../physical-plan/src/aggregates/group_values/group_column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 61894ffe4a051..13fc0865115f0 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -501,9 +501,9 @@ impl ByteGroupValueViewBuilder { // The check logic // - Check len equality + // - If inlined, check inlined value // - If non-inlined, check prefix and then check value in buffer // when needed - // - If inlined, check inlined value if exist_view_len != input_view_len { return false; } From f8d77923773d7add385c9bbadacc0c021168063f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Oct 2024 13:46:11 -0400 Subject: [PATCH 06/14] Wire in GroupColumn storage --- .../src/aggregates/group_values/column.rs | 23 +++- .../aggregates/group_values/group_column.rs | 115 +++++++++++------- 2 files changed, 88 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 28f35b2bded2e..246a569bb0204 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -16,7 +16,8 @@ // under the License. use crate::aggregates::group_values::group_column::{ - ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder, + ByteGroupValueBuilder, ByteViewGroupValueBuilder, BytesOutputType, GroupColumn, + PrimitiveGroupValueBuilder, }; use crate::aggregates::group_values::GroupValues; use ahash::RandomState; @@ -26,13 +27,13 @@ use arrow::datatypes::{ Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use arrow::record_batch::RecordBatch; +use arrow_array::types::{BinaryViewType, StringViewType}; use arrow_array::{Array, ArrayRef}; use arrow_schema::{DataType, Schema, SchemaRef}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; -use datafusion_physical_expr::binary_map::OutputType; use hashbrown::raw::RawTable; @@ -169,19 +170,29 @@ impl GroupValues for GroupValuesColumn { &DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type), &DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type), &DataType::Utf8 => { - let b = ByteGroupValueBuilder::::new(OutputType::Utf8); + let b = ByteGroupValueBuilder::::new(BytesOutputType::Utf8); v.push(Box::new(b) as _) } &DataType::LargeUtf8 => { - let b = ByteGroupValueBuilder::::new(OutputType::Utf8); + let b = ByteGroupValueBuilder::::new(BytesOutputType::Utf8); v.push(Box::new(b) as _) } &DataType::Binary => { - let b = ByteGroupValueBuilder::::new(OutputType::Binary); + let b = + ByteGroupValueBuilder::::new(BytesOutputType::Binary); v.push(Box::new(b) as _) } &DataType::LargeBinary => { - let b = ByteGroupValueBuilder::::new(OutputType::Binary); + let b = + ByteGroupValueBuilder::::new(BytesOutputType::Binary); + v.push(Box::new(b) as _) + } + &DataType::Utf8View => { + let b = ByteViewGroupValueBuilder::::new(); + v.push(Box::new(b) as _) + } + &DataType::BinaryView => { + let b = ByteViewGroupValueBuilder::::new(); v.push(Box::new(b) as _) } dt => { diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 13fc0865115f0..02a728b7079dd 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -22,9 +22,6 @@ use arrow::array::GenericBinaryArray; use arrow::array::GenericStringArray; use arrow::array::OffsetSizeTrait; use arrow::array::PrimitiveArray; -use arrow::array::PrimitiveBuilder; -use arrow::array::StringBuilder; -use arrow::array::StringViewBuilder; use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}; use arrow::buffer::OffsetBuffer; use arrow::buffer::ScalarBuffer; @@ -32,15 +29,14 @@ use arrow::datatypes::ByteArrayType; use arrow::datatypes::ByteViewType; use arrow::datatypes::DataType; use arrow::datatypes::GenericBinaryType; -use arrow_array::BinaryViewArray; use arrow_array::GenericByteViewArray; -use arrow_array::StringViewArray; use arrow_buffer::Buffer; use datafusion_common::utils::proxy::VecAllocExt; +use std::marker::PhantomData; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow_array::types::GenericStringType; -use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; +use datafusion_physical_expr_common::binary_map::INITIAL_BUFFER_CAPACITY; use std::mem; use std::sync::Arc; use std::vec; @@ -71,6 +67,13 @@ pub trait GroupColumn: Send + Sync { fn take_n(&mut self, n: usize) -> ArrayRef; } +/// Should the builder output binary or UTF8? +#[derive(Debug, PartialEq, Clone, Copy)] +pub(crate) enum BytesOutputType { + Binary, + Utf8, +} + /// An implementation of [`GroupColumn`] for primitive values /// /// Optimized to skip null buffer construction if the input is known to be non nullable @@ -178,7 +181,7 @@ pub struct ByteGroupValueBuilder where O: OffsetSizeTrait, { - output_type: OutputType, + bytes_output_type: BytesOutputType, buffer: BufferBuilder, /// Offsets into `buffer` for each distinct value. These offsets as used /// directly to create the final `GenericBinaryArray`. The `i`th string is @@ -193,9 +196,9 @@ impl ByteGroupValueBuilder where O: OffsetSizeTrait, { - pub fn new(output_type: OutputType) -> Self { + pub fn new(bytes_output_type: BytesOutputType) -> Self { Self { - output_type, + bytes_output_type, buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY), offsets: vec![O::default()], nulls: MaybeNullBufferBuilder::new(), @@ -249,43 +252,41 @@ where { fn equal_to(&self, lhs_row: usize, column: &ArrayRef, rhs_row: usize) -> bool { // Sanity array type - match self.output_type { - OutputType::Binary => { + match self.bytes_output_type { + BytesOutputType::Binary => { debug_assert!(matches!( column.data_type(), DataType::Binary | DataType::LargeBinary )); self.equal_to_inner::>(lhs_row, column, rhs_row) } - OutputType::Utf8 => { + BytesOutputType::Utf8 => { debug_assert!(matches!( column.data_type(), DataType::Utf8 | DataType::LargeUtf8 )); self.equal_to_inner::>(lhs_row, column, rhs_row) } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), } } fn append_val(&mut self, column: &ArrayRef, row: usize) { // Sanity array type - match self.output_type { - OutputType::Binary => { + match self.bytes_output_type { + BytesOutputType::Binary => { debug_assert!(matches!( column.data_type(), DataType::Binary | DataType::LargeBinary )); self.append_val_inner::>(column, row) } - OutputType::Utf8 => { + BytesOutputType::Utf8 => { debug_assert!(matches!( column.data_type(), DataType::Utf8 | DataType::LargeUtf8 )); self.append_val_inner::>(column, row) } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), }; } @@ -301,7 +302,7 @@ where fn build(self: Box) -> ArrayRef { let Self { - output_type, + bytes_output_type: output_type, mut buffer, offsets, nulls, @@ -314,13 +315,13 @@ where let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) }; let values = buffer.finish(); match output_type { - OutputType::Binary => { + BytesOutputType::Binary => { // SAFETY: the offsets were constructed correctly Arc::new(unsafe { GenericBinaryArray::new_unchecked(offsets, values, null_buffer) }) } - OutputType::Utf8 => { + BytesOutputType::Utf8 => { // SAFETY: // 1. the offsets were constructed safely // @@ -331,7 +332,6 @@ where GenericStringArray::new_unchecked(offsets, values, null_buffer) }) } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), } } @@ -364,14 +364,14 @@ where let values = self.buffer.finish(); self.buffer = remaining_buffer; - match self.output_type { - OutputType::Binary => { + match self.bytes_output_type { + BytesOutputType::Binary => { // SAFETY: the offsets were constructed correctly Arc::new(unsafe { GenericBinaryArray::new_unchecked(offsets, values, null_buffer) }) } - OutputType::Utf8 => { + BytesOutputType::Utf8 => { // SAFETY: // 1. the offsets were constructed safely // @@ -382,7 +382,6 @@ where GenericStringArray::new_unchecked(offsets, values, null_buffer) }) } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), } } } @@ -395,9 +394,7 @@ where /// 1. Efficient comparison of incoming rows to existing rows /// 2. Efficient construction of the final output array /// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder` -pub struct ByteGroupValueViewBuilder { - output_type: OutputType, - +pub struct ByteViewGroupValueBuilder { /// The views of string values /// /// If string len <= 12, the view's format will be: @@ -425,13 +422,24 @@ pub struct ByteGroupValueViewBuilder { /// Nulls nulls: MaybeNullBufferBuilder, + + /// phantom data so the type requires + _phantom: PhantomData, } -impl ByteGroupValueViewBuilder { - fn append_val_inner(&mut self, array: &ArrayRef, row: usize) - where - B: ByteViewType, - { +impl ByteViewGroupValueBuilder { + pub fn new() -> Self { + Self { + views: vec![], + in_progress: Vec::with_capacity(INITIAL_BUFFER_CAPACITY), + completed: vec![], + max_block_size: INITIAL_BUFFER_CAPACITY, + nulls: MaybeNullBufferBuilder::new(), + _phantom: PhantomData {}, + } + } + + fn append_val_inner(&mut self, array: &ArrayRef, row: usize) { let arr = array.as_byte_view::(); // If a null row, set and return @@ -479,10 +487,7 @@ impl ByteGroupValueViewBuilder { } } - fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool - where - B: ByteViewType, - { + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { let array = array.as_byte_view::(); // Check if nulls equal firstly @@ -557,6 +562,32 @@ impl ByteGroupValueViewBuilder { } } +impl GroupColumn for ByteViewGroupValueBuilder { + fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + todo!() + } + + fn append_val(&mut self, array: &ArrayRef, row: usize) { + todo!() + } + + fn len(&self) -> usize { + todo!() + } + + fn size(&self) -> usize { + todo!() + } + + fn build(self: Box) -> ArrayRef { + todo!() + } + + fn take_n(&mut self, n: usize) -> ArrayRef { + todo!() + } +} + /// Determines if the nullability of the existing and new input array can be used /// to short-circuit the comparison of the two values. /// @@ -573,20 +604,16 @@ fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> Option { #[cfg(test)] mod tests { + use super::*; use std::sync::Arc; use arrow::datatypes::Int64Type; use arrow_array::{ArrayRef, Int64Array, StringArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; - use datafusion_physical_expr::binary_map::OutputType; - - use crate::aggregates::group_values::group_column::PrimitiveGroupValueBuilder; - - use super::{ByteGroupValueBuilder, GroupColumn}; #[test] fn test_take_n() { - let mut builder = ByteGroupValueBuilder::::new(OutputType::Utf8); + let mut builder = ByteGroupValueBuilder::::new(BytesOutputType::Utf8); let array = Arc::new(StringArray::from(vec![Some("a"), None])) as ArrayRef; // a, null, null builder.append_val(&array, 0); @@ -713,7 +740,7 @@ mod tests { // - exist not null, input not null; values equal // Define PrimitiveGroupValueBuilder - let mut builder = ByteGroupValueBuilder::::new(OutputType::Utf8); + let mut builder = ByteGroupValueBuilder::::new(BytesOutputType::Utf8); let builder_array = Arc::new(StringArray::from(vec![ None, None, From deb64e684eb536c8f0b74295f21f08e151301c7b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Oct 2024 17:15:32 -0400 Subject: [PATCH 07/14] Add tests --- .../sqllogictest/test_files/group_by.slt | 65 ++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index a80a0891e9770..4807342c5a1a9 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -5207,4 +5207,67 @@ NULL NULL 2 NULL a 2 statement ok -drop table t; +drop table t + + +# test multi group by int + utf8view +statement ok +create table source as values +-- use some strings that are larger than 12 characters as that goes through a different path +(1, 'a'), +(1, 'a'), +(2, 'thisstringislongerthan12'), +(2, 'thisstring'), +(3, 'abc'), +(3, 'cba'), +(2, 'thisstring'), +(null, null), +(null, 'a'), +(null, null), +(null, 'a'), +(2, 'thisstringisalsolongerthan12'), +(2, 'thisstringislongerthan12'), +(1, 'null') +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Utf8View') as b from source; + +query ITI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 a 2 +1 null 1 +2 thisstring 2 +2 thisstringisalsolongerthan12 1 +2 thisstringislongerthan12 2 +3 abc 1 +3 cba 1 +NULL a 2 +NULL NULL 2 + +statement ok +drop view t + +# test with binary view +statement ok +create view t as select column1 as a, arrow_cast(column2, 'BinaryView') as b from source; + +query I?I +select a, b, count(*) from t group by a, b order by a, b; +---- +1 61 2 +1 6e756c6c 1 +2 74686973737472696e67 2 +2 74686973737472696e676973616c736f6c6f6e6765727468616e3132 1 +2 74686973737472696e6769736c6f6e6765727468616e3132 2 +3 616263 1 +3 636261 1 +NULL 61 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source; From 9581d0cb73743d4a065d7bd3fe1c0959714c9c58 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Oct 2024 17:16:41 -0400 Subject: [PATCH 08/14] hook up testing --- datafusion/physical-plan/src/aggregates/group_values/column.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 246a569bb0204..3cd59390971e6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -120,6 +120,8 @@ impl GroupValuesColumn { | DataType::LargeBinary | DataType::Date32 | DataType::Date64 + | DataType::Utf8View + | DataType::BinaryView ) } } From fc570d9970f386039677e5a97c517f65ea4a7cb4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Oct 2024 17:29:17 -0400 Subject: [PATCH 09/14] Connect up --- .../aggregates/group_values/group_column.rs | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 02a728b7079dd..988efd7696ac7 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -564,23 +564,48 @@ impl ByteViewGroupValueBuilder { impl GroupColumn for ByteViewGroupValueBuilder { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - todo!() + self.equal_to_inner(lhs_row, array, rhs_row) } fn append_val(&mut self, array: &ArrayRef, row: usize) { - todo!() + self.append_val_inner(array, row) } fn len(&self) -> usize { - todo!() + self.views.len() } fn size(&self) -> usize { - todo!() + self.views.capacity() * std::mem::size_of::() + + self.in_progress.len() + + self.completed.iter().map(|b| b.len()).sum::() + + self.nulls.allocated_size() } fn build(self: Box) -> ArrayRef { - todo!() + let Self { + views, + in_progress, + mut completed, + max_block_size: _, + nulls, + _phantom, + } = *self; + + // complete the in progress view + completed.push(in_progress.into()); + + // safety + // all input values came from arrays of the correct type (so were valid utf8 if string) + // all views were created correctly + let arr = unsafe { + GenericByteViewArray::::new_unchecked( + views.into(), + completed, + nulls.build(), + ) + }; + Arc::new(arr) } fn take_n(&mut self, n: usize) -> ArrayRef { From ff3d0a71b3a4cadf3079b69316229b5cdda51bcf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Oct 2024 17:44:06 -0400 Subject: [PATCH 10/14] add test for equal to --- .../aggregates/group_values/group_column.rs | 79 ++++++++++++++++++- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 988efd7696ac7..f8f47095f76b9 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -560,6 +560,12 @@ impl ByteViewGroupValueBuilder { &self.in_progress[offset..offset + length] } } + + fn inner_take_n(&mut self, n: usize) -> ArrayRef { + // take the views and then need to clear exisiting buffers + todo!() + + } } impl GroupColumn for ByteViewGroupValueBuilder { @@ -609,7 +615,7 @@ impl GroupColumn for ByteViewGroupValueBuilder { } fn take_n(&mut self, n: usize) -> ArrayRef { - todo!() + self.inner_take_n(n) } } @@ -633,7 +639,8 @@ mod tests { use std::sync::Arc; use arrow::datatypes::Int64Type; - use arrow_array::{ArrayRef, Int64Array, StringArray}; + use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray}; + use arrow_array::types::StringViewType; use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; #[test] @@ -812,4 +819,72 @@ mod tests { assert!(!builder.equal_to(4, &input_array, 4)); assert!(builder.equal_to(5, &input_array, 5)); } + + + + #[test] + fn test_byte_view_equal_to() { + // Will cover such cases: + // - exist null, input not null + // - exist null, input null; values not equal + // - exist null, input null; values equal + // - exist not null, input null + // - exist not null, input not null; values not equal + // - exist not null, input not null; values equal + + // Define PrimitiveGroupValueBuilder + let mut builder = ByteViewGroupValueBuilder::::new(); + let builder_array = Arc::new(StringViewArray::from(vec![ + None, + None, + None, + Some("foo"), + Some("bar"), + Some("this string is quite long"), + Some("baz"), + ])) as ArrayRef; + builder.append_val(&builder_array, 0); + builder.append_val(&builder_array, 1); + builder.append_val(&builder_array, 2); + builder.append_val(&builder_array, 3); + builder.append_val(&builder_array, 4); + builder.append_val(&builder_array, 5); + builder.append_val(&builder_array, 6); + + // Define input array + let (views, buffer, _nulls) = StringViewArray::from(vec![ + Some("foo"), + Some("bar"), // set to null + Some("this string is quite long"), // set to null + None, + None, + Some("foo"), + Some("baz"), + ]) + .into_parts(); + + // explicitly build a boolean buffer where one of the null values also happens to match + let mut boolean_buffer_builder = BooleanBufferBuilder::new(6); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(false); // this sets Some("bar") to null above + boolean_buffer_builder.append(false); // this sets Some("thisstringisquitelong") to null above + boolean_buffer_builder.append(false); + boolean_buffer_builder.append(false); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(true); + let nulls = NullBuffer::new(boolean_buffer_builder.finish()); + let input_array = + Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as ArrayRef; + + // Check + assert!(!builder.equal_to(0, &input_array, 0)); + assert!(builder.equal_to(1, &input_array, 1)); + assert!(builder.equal_to(2, &input_array, 2)); + assert!(!builder.equal_to(3, &input_array, 3)); + assert!(!builder.equal_to(4, &input_array, 4)); + assert!(!builder.equal_to(5, &input_array, 5)); + assert!(builder.equal_to(6, &input_array, 6)); + + } + } From 364a0671566a33f5ffb32e7d2e21ebec81e83d7d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Oct 2024 17:54:49 -0400 Subject: [PATCH 11/14] Add test for creation --- .../aggregates/group_values/group_column.rs | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index f8f47095f76b9..31dcac601f8c5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -439,6 +439,13 @@ impl ByteViewGroupValueBuilder { } } + /// Set the max block size + #[cfg(test)] + fn with_max_block_size(mut self, max_block_size: usize) -> Self { + self.max_block_size = max_block_size; + self + } + fn append_val_inner(&mut self, array: &ArrayRef, row: usize) { let arr = array.as_byte_view::(); @@ -564,7 +571,6 @@ impl ByteViewGroupValueBuilder { fn inner_take_n(&mut self, n: usize) -> ArrayRef { // take the views and then need to clear exisiting buffers todo!() - } } @@ -639,8 +645,8 @@ mod tests { use std::sync::Arc; use arrow::datatypes::Int64Type; - use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray}; use arrow_array::types::StringViewType; + use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; #[test] @@ -820,7 +826,32 @@ mod tests { assert!(builder.equal_to(5, &input_array, 5)); } + #[test] + fn test_byte_view_append_val() { + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(30); + let builder_array = StringViewArray::from(vec![ + Some("this string is quite long"), // in buffer 0 + Some("foo"), + None, + Some("bar"), + Some("this string is also quite long"), // buffer 0 + Some("this string is quite long"), // buffer 1 + Some("bar"), + ]); + let builder_array: ArrayRef = Arc::new(builder_array); + for row in 0..builder_array.len() { + builder.append_val(&builder_array, row); + } + let output = Box::new(builder).build(); + assert_eq!( + output.as_string_view().data_buffers().len(), + 2, + "output should have 2 buffers" + ); + assert_eq!(&output, &builder_array) + } #[test] fn test_byte_view_equal_to() { @@ -832,7 +863,6 @@ mod tests { // - exist not null, input not null; values not equal // - exist not null, input not null; values equal - // Define PrimitiveGroupValueBuilder let mut builder = ByteViewGroupValueBuilder::::new(); let builder_array = Arc::new(StringViewArray::from(vec![ None, @@ -854,14 +884,14 @@ mod tests { // Define input array let (views, buffer, _nulls) = StringViewArray::from(vec![ Some("foo"), - Some("bar"), // set to null + Some("bar"), // set to null Some("this string is quite long"), // set to null None, None, Some("foo"), Some("baz"), ]) - .into_parts(); + .into_parts(); // explicitly build a boolean buffer where one of the null values also happens to match let mut boolean_buffer_builder = BooleanBufferBuilder::new(6); @@ -884,7 +914,5 @@ mod tests { assert!(!builder.equal_to(4, &input_array, 4)); assert!(!builder.equal_to(5, &input_array, 5)); assert!(builder.equal_to(6, &input_array, 6)); - } - } From dc9bede8c29493345d08f32e4de545aad8709461 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Oct 2024 18:03:11 -0400 Subject: [PATCH 12/14] complete test --- .../aggregates/group_values/group_column.rs | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 31dcac601f8c5..c12529dacacef 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -829,7 +829,7 @@ mod tests { #[test] fn test_byte_view_append_val() { let mut builder = - ByteViewGroupValueBuilder::::new().with_max_block_size(30); + ByteViewGroupValueBuilder::::new().with_max_block_size(60); let builder_array = StringViewArray::from(vec![ Some("this string is quite long"), // in buffer 0 Some("foo"), @@ -845,10 +845,10 @@ mod tests { } let output = Box::new(builder).build(); + // should be 2 output buffers to hold all the data assert_eq!( output.as_string_view().data_buffers().len(), 2, - "output should have 2 buffers" ); assert_eq!(&output, &builder_array) } @@ -915,4 +915,56 @@ mod tests { assert!(!builder.equal_to(5, &input_array, 5)); assert!(builder.equal_to(6, &input_array, 6)); } + + + #[test] + fn test_byte_view_take_n() { + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(60); + let input_array = StringViewArray::from(vec![ + Some("this string is quite long"), // in buffer 0 + Some("foo"), + Some("bar"), + Some("this string is also quite long"), // buffer 0 + Some("this string is quite long"), // buffer 1 + Some("another string that is is quite long"), // buffer 2 + Some("bar"), + ]); + let input_array: ArrayRef = Arc::new(input_array); + for row in 0..input_array.len() { + builder.append_val(&input_array, row); + } + + // should be 2 completed, one in progress buffer to hold all output + assert_eq!(builder.completed.len(), 2); + assert!(builder.in_progress.len() > 0); + + let first_4 = builder.take_n(4); + assert_eq!(&first_4, &input_array.slice(0, 4)); + + // Add some new data after the first n + let input_array = StringViewArray::from(vec![ + Some("short"), + Some("Some new data to add that is long"), // in buffer 0 + Some("short again"), + ]); + let input_array: ArrayRef = Arc::new(input_array); + for row in 0..input_array.len() { + builder.append_val(&input_array, row); + } + + let result = Box::new(builder).build(); + let expected: ArrayRef = Arc::new(StringViewArray::from(vec![ + // last three rows of the original input + Some("this string is quite long"), + Some("another string that is is quite long"), + Some("bar"), + // last three rows of the subsequent input + Some("short"), + Some("Some new data to add that is long"), // in buffer 0 + Some("short again"), + ])); + assert_eq!(result, expected); + } + } From e769f96cad73d58b5dfc873c5434fa789453433e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Oct 2024 18:05:22 -0400 Subject: [PATCH 13/14] take-n test --- .../src/aggregates/group_values/group_column.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index c12529dacacef..97044250d0cf1 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -926,8 +926,10 @@ mod tests { Some("foo"), Some("bar"), Some("this string is also quite long"), // buffer 0 + None, Some("this string is quite long"), // buffer 1 - Some("another string that is is quite long"), // buffer 2 + None, + Some("another string that is is quite long"), // buffer 1 Some("bar"), ]); let input_array: ArrayRef = Arc::new(input_array); @@ -945,6 +947,7 @@ mod tests { // Add some new data after the first n let input_array = StringViewArray::from(vec![ Some("short"), + None, Some("Some new data to add that is long"), // in buffer 0 Some("short again"), ]); @@ -955,16 +958,19 @@ mod tests { let result = Box::new(builder).build(); let expected: ArrayRef = Arc::new(StringViewArray::from(vec![ - // last three rows of the original input + // last rows of the original input + None, Some("this string is quite long"), + None, Some("another string that is is quite long"), Some("bar"), - // last three rows of the subsequent input + // the subsequent input Some("short"), + None, Some("Some new data to add that is long"), // in buffer 0 Some("short again"), ])); - assert_eq!(result, expected); + assert_eq!(&result, &expected); } } From f3f4e65054c0e9c59522142cc7aa9b909be85999 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Oct 2024 18:36:22 -0400 Subject: [PATCH 14/14] basic take_n --- .../aggregates/group_values/group_column.rs | 91 +++++++++++++++++-- 1 file changed, 82 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 97044250d0cf1..e5333e92d877a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -29,7 +29,7 @@ use arrow::datatypes::ByteArrayType; use arrow::datatypes::ByteViewType; use arrow::datatypes::DataType; use arrow::datatypes::GenericBinaryType; -use arrow_array::GenericByteViewArray; +use arrow_array::{GenericByteViewArray, StringViewArray}; use arrow_buffer::Buffer; use datafusion_common::utils::proxy::VecAllocExt; use std::marker::PhantomData; @@ -38,6 +38,7 @@ use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow_array::types::GenericStringType; use datafusion_physical_expr_common::binary_map::INITIAL_BUFFER_CAPACITY; use std::mem; +use std::str::from_utf8; use std::sync::Arc; use std::vec; @@ -485,15 +486,21 @@ impl ByteViewGroupValueBuilder { // If current block isn't big enough, flush it and create a new in progress block if require_cap > self.max_block_size { - let flushed_block = mem::replace( - &mut self.in_progress, - Vec::with_capacity(self.max_block_size), - ); - let buffer = Buffer::from_vec(flushed_block); - self.completed.push(buffer); + self.finish_in_progress() } } + /// Finishes the current in progress block and appends it to self.completed + fn finish_in_progress(&mut self) { + let flushed_block = mem::replace( + &mut self.in_progress, + Vec::with_capacity(self.max_block_size), + ); + let buffer = Buffer::from_vec(flushed_block); + self.completed.push(buffer); + } + + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { let array = array.as_byte_view::(); @@ -569,11 +576,76 @@ impl ByteViewGroupValueBuilder { } fn inner_take_n(&mut self, n: usize) -> ArrayRef { - // take the views and then need to clear exisiting buffers - todo!() + let views = self.views.drain(0..n).collect::>(); + + let num_buffers_in_first_n = if let Some(idx) = higest_buffer_index(&views) { + let idx = idx as usize; + // if the views we are returning contain currently in progress + // buffer, finalize it + if idx > self.completed.len() { + self.finish_in_progress(); + } + idx+1 + } else { + 0 + }; + + let buffers = self.completed[0..num_buffers_in_first_n].to_vec(); + let nulls = self.nulls.take_n(n); + + // TODO: remove any buffers no longer referenced + if let Some(idx) = higest_buffer_index(&self.views) { + //if idx as usize <= self.completed.len() { + // self.completed.drain() + //} + // update existing views to reflect removed buffers + } + + + // safety + // all input values came from arrays of the correct type (so were valid utf8 if string) + // all views were created correctly + let arr = unsafe { + GenericByteViewArray::::new_unchecked( + views.into(), + buffers, + nulls, + ) + }; + Arc::new(arr) } } +/// returns the highest buffer index referred to in a set og u128 views +/// assumes that all buffer indexes are monotonically increasing +/// +/// Returns `None` if no buffers are referenced (e.g. all views are inlined) +fn higest_buffer_index(views: &[u128]) -> Option { + println!("Checking views"); + for v in views.iter() { + let view_len = (*v as u32) as usize; + if view_len < 12 { + let prefix = unsafe { StringViewArray::inline_value(v, view_len) }; + println!("len: {view_len} inline: {}", from_utf8(prefix).unwrap()) + } + else { + println!( + "len: {view_len} val: {:?}", + ByteView::from(*v) + ); + } + } + views.iter().rev().find_map(|view| { + let view_len = *view as u32; + if view_len < 12 { + None // inline view + } else { + let view = ByteView::from(*view); + Some(view.buffer_index) + } + }) +} + impl GroupColumn for ByteViewGroupValueBuilder { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { self.equal_to_inner(lhs_row, array, rhs_row) @@ -942,6 +1014,7 @@ mod tests { assert!(builder.in_progress.len() > 0); let first_4 = builder.take_n(4); + println!("{}", arrow::util::pretty::pretty_format_columns("first_4", &[first_4.clone()]).unwrap()); assert_eq!(&first_4, &input_array.slice(0, 4)); // Add some new data after the first n