diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 7969244200568..2a06f3fbab02e 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -17,16 +17,16 @@ //! [`ArrowBytesViewMap`] and [`ArrowBytesViewSet`] for storing maps/sets of values from //! `StringViewArray`/`BinaryViewArray`. -//! Much of the code is from `binary_map.rs`, but with simpler implementation because we directly use the -//! [`GenericByteViewBuilder`]. use crate::binary_map::OutputType; use ahash::RandomState; use arrow::array::cast::AsArray; -use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder}; +use arrow::array::{Array, ArrayRef, BinaryViewArray, ByteView, make_view}; +use arrow::buffer::{Buffer, NullBuffer, ScalarBuffer}; use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt}; use std::fmt::Debug; +use std::mem::size_of; use std::sync::Arc; /// HashSet optimized for storing string or binary values that can produce that @@ -113,6 +113,9 @@ impl ArrowBytesViewSet { /// This map is used by the special `COUNT DISTINCT` aggregate function to /// store the distinct values, and by the `GROUP BY` operator to store /// group values when they are a single string array. +/// Max size of the in-progress buffer before flushing to completed buffers +const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; + pub struct ArrowBytesViewMap where V: Debug + PartialEq + Eq + Clone + Copy + Default, @@ -124,8 +127,15 @@ where /// Total size of the map in bytes map_size: usize, - /// Builder for output array - builder: GenericByteViewBuilder, + /// Views for all stored values (in insertion order) + views: Vec, + /// In-progress buffer for out-of-line string data + in_progress: Vec, + /// Completed buffers containing string data + completed: Vec, + /// Tracks null values (true = null) + nulls: Vec, + /// random state used to generate hashes random_state: RandomState, /// buffer that stores hash values (reused across batches to save allocations) @@ -148,7 +158,10 @@ where output_type, map: hashbrown::hash_table::HashTable::with_capacity(INITIAL_MAP_CAPACITY), map_size: 0, - builder: GenericByteViewBuilder::new(), + views: Vec::new(), + in_progress: Vec::new(), + completed: Vec::new(), + nulls: Vec::new(), random_state: RandomState::new(), hashes_buffer: vec![], null: None, @@ -250,52 +263,92 @@ where // step 2: insert each value into the set, if not already present let values = values.as_byte_view::(); + // Get raw views buffer for direct comparison + let input_views = values.views(); + // Ensure lengths are equivalent - assert_eq!(values.len(), batch_hashes.len()); + assert_eq!(values.len(), self.hashes_buffer.len()); + + for i in 0..values.len() { + let view_u128 = input_views[i]; + let hash = self.hashes_buffer[i]; - for (value, &hash) in values.iter().zip(batch_hashes.iter()) { - // handle null value - let Some(value) = value else { + // handle null value via validity bitmap check + if !values.is_valid(i) { let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { payload } else { let payload = make_payload_fn(None); - let null_index = self.builder.len(); - self.builder.append_null(); + let null_index = self.views.len(); + self.views.push(0); + self.nulls.push(true); self.null = Some((payload, null_index)); payload }; observe_payload_fn(payload); continue; - }; + } - // get the value as bytes - let value: &[u8] = value.as_ref(); + // Extract length from the view (first 4 bytes of u128 in little-endian) + let len = view_u128 as u32; - let entry = self.map.find_mut(hash, |header| { - if header.hash != hash { - return false; - } - let v = self.builder.get_value(header.view_idx); + // Check if value already exists + let maybe_payload = { + // Borrow completed and in_progress for comparison + let completed = &self.completed; + let in_progress = &self.in_progress; - v == value - }); + self.map + .find(hash, |header| { + if header.hash != hash { + return false; + } + + // Fast path: inline strings can be compared directly + if len <= 12 { + return header.view == view_u128; + } + + // For larger strings: first compare the 4-byte prefix + let stored_prefix = (header.view >> 32) as u32; + let input_prefix = (view_u128 >> 32) as u32; + if stored_prefix != input_prefix { + return false; + } + + // Prefix matched - compare full bytes + let byte_view = ByteView::from(header.view); + let stored_len = byte_view.length as usize; + let buffer_index = byte_view.buffer_index as usize; + let offset = byte_view.offset as usize; + + let stored_value = if buffer_index < completed.len() { + &completed[buffer_index].as_slice() + [offset..offset + stored_len] + } else { + &in_progress[offset..offset + stored_len] + }; + let input_value: &[u8] = values.value(i).as_ref(); + stored_value == input_value + }) + .map(|entry| entry.payload) + }; - let payload = if let Some(entry) = entry { - entry.payload + let payload = if let Some(payload) = maybe_payload { + payload } else { - // no existing value, make a new one. + // no existing value, make a new one + let value: &[u8] = values.value(i).as_ref(); let payload = make_payload_fn(Some(value)); - let inner_view_idx = self.builder.len(); + // Create view pointing to our buffers + let new_view = self.append_value(value); let new_header = Entry { - view_idx: inner_view_idx, + view: new_view, hash, payload, }; - self.builder.append_value(value); - self.map .insert_accounted(new_header, |h| h.hash, &mut self.map_size); payload @@ -310,29 +363,67 @@ where /// /// The values are guaranteed to be returned in the same order in which /// they were first seen. - pub fn into_state(self) -> ArrayRef { - let mut builder = self.builder; - match self.output_type { - OutputType::BinaryView => { - let array = builder.finish(); + pub fn into_state(mut self) -> ArrayRef { + // Flush any remaining in-progress buffer + if !self.in_progress.is_empty() { + let flushed = std::mem::take(&mut self.in_progress); + self.completed.push(Buffer::from_vec(flushed)); + } - Arc::new(array) - } + // Build null buffer if we have any nulls + let null_buffer = if self.nulls.iter().any(|&is_null| is_null) { + Some(NullBuffer::from( + self.nulls + .iter() + .map(|&is_null| !is_null) + .collect::>(), + )) + } else { + None + }; + + let views = ScalarBuffer::from(self.views); + let array = + unsafe { BinaryViewArray::new_unchecked(views, self.completed, null_buffer) }; + + match self.output_type { + OutputType::BinaryView => Arc::new(array), OutputType::Utf8View => { - // SAFETY: - // we asserted the input arrays were all the correct type and - // thus since all the values that went in were valid (e.g. utf8) - // so are all the values that come out - let array = builder.finish(); + // SAFETY: all input was valid utf8 let array = unsafe { array.to_string_view_unchecked() }; Arc::new(array) } - _ => { - unreachable!("Utf8/Binary should use `ArrowBytesMap`") - } + _ => unreachable!("Utf8/Binary should use `ArrowBytesMap`"), } } + /// Append a value to our buffers and return the view pointing to it + fn append_value(&mut self, value: &[u8]) -> u128 { + let len = value.len(); + let view = if len <= 12 { + make_view(value, 0, 0) + } else { + // Ensure buffer is big enough + if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE { + let flushed = std::mem::replace( + &mut self.in_progress, + Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE), + ); + self.completed.push(Buffer::from_vec(flushed)); + } + + let buffer_index = self.completed.len() as u32; + let offset = self.in_progress.len() as u32; + self.in_progress.extend_from_slice(value); + + make_view(value, buffer_index, offset) + }; + + self.views.push(view); + self.nulls.push(false); + view + } + /// Total number of entries (including null, if present) pub fn len(&self) -> usize { self.non_null_len() + self.null.map(|_| 1).unwrap_or(0) @@ -351,8 +442,16 @@ where /// Return the total size, in bytes, of memory used to store the data in /// this set, not including `self` pub fn size(&self) -> usize { + let views_size = self.views.len() * size_of::(); + let in_progress_size = self.in_progress.capacity(); + let completed_size: usize = self.completed.iter().map(|b| b.len()).sum(); + let nulls_size = self.nulls.len(); + self.map_size - + self.builder.allocated_size() + + views_size + + in_progress_size + + completed_size + + nulls_size + self.hashes_buffer.allocated_size() } } @@ -365,7 +464,8 @@ where f.debug_struct("ArrowBytesMap") .field("map", &"") .field("map_size", &self.map_size) - .field("view_builder", &self.builder) + .field("views_len", &self.views.len()) + .field("completed_buffers", &self.completed.len()) .field("random_state", &self.random_state) .field("hashes_buffer", &self.hashes_buffer) .finish() @@ -373,13 +473,20 @@ where } /// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details +/// +/// Stores the view pointing to our internal buffers, eliminating the need +/// for a separate builder index. For inline strings (<=12 bytes), the view +/// contains the entire value. For out-of-line strings, the view contains +/// buffer_index and offset pointing directly to our storage. #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] struct Entry where V: Debug + PartialEq + Eq + Clone + Copy + Default, { - /// The idx into the views array - view_idx: usize, + /// The u128 view pointing to our internal buffers. For inline strings, + /// this contains the complete value. For larger strings, this contains + /// the buffer_index/offset into our completed/in_progress buffers. + view: u128, hash: u64,