diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index a2442d496a5..2760b116f7d 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -141,10 +141,13 @@ impl FieldEncoder for BlobStructuralEncoder { let def = repdef.definition_levels.as_ref(); let def_meaning: Arc<[DefinitionInterpretation]> = repdef.def_meaning.into(); - if self.def_meaning.is_none() { - self.def_meaning = Some(def_meaning.clone()); - } else { - debug_assert_eq!(self.def_meaning.as_ref().unwrap(), &def_meaning); + match self.def_meaning.as_ref() { + None => { + self.def_meaning = Some(def_meaning.clone()); + } + Some(existing) => { + debug_assert_eq!(existing, &def_meaning); + } } // Collect positions and sizes diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 9e22c414d48..58c45b7092d 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -66,8 +66,7 @@ use crate::{ }; use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result}; -use crate::constants::DICT_SIZE_RATIO_META_KEY; -use crate::encodings::logical::primitive::dict::DICT_INDICES_BITS_PER_VALUE; +use crate::constants::{DICT_DIVISOR_META_KEY, DICT_SIZE_RATIO_META_KEY}; use crate::version::LanceFileVersion; use crate::{ buffer::LanceBuffer, @@ -92,6 +91,9 @@ pub mod fullzip; pub mod miniblock; const FILL_BYTE: u8 = 0xFE; +const DEFAULT_DICT_DIVISOR: u64 = 2; +const DEFAULT_DICT_MAX_CARDINALITY: u64 = 100_000; +const DEFAULT_DICT_SIZE_RATIO: f64 = 0.8; struct PageLoadTask { decoder_fut: BoxFuture<'static, Result>>, @@ -2295,27 +2297,28 @@ impl StructuralPageScheduler for FullZipScheduler { io: &Arc, ) -> BoxFuture<'a, Result>> { // Check if caching is enabled and we have a repetition index - if self.enable_cache && self.rep_index.is_some() { - let rep_index = self.rep_index.as_ref().unwrap(); - // Calculate the total size of the repetition index - let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value; - let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size); - - // Load the repetition index buffer - let io_clone = io.clone(); - let future = async move { - let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?; - let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1); - - // Create and return the cacheable state - Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc) - }; + if self.enable_cache { + if let Some(rep_index) = self.rep_index.as_ref() { + // Calculate the total size of the repetition index + let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value; + let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size); + + // Load the repetition index buffer + let io_clone = io.clone(); + let future = async move { + let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?; + let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1); + + // Create and return the cacheable state + Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) + as Arc) + }; - future.boxed() - } else { - // Caching disabled or no repetition index, skip caching - std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc)).boxed() + return future.boxed(); + } } + // Caching disabled or no repetition index, skip caching + std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc)).boxed() } /// Loads previously cached repetition index data from the cache system. @@ -3449,6 +3452,12 @@ struct SerializedMiniBlockPage { metadata: LanceBuffer, } +#[derive(Debug, Clone, Copy)] +struct DictEncodingBudget { + max_dict_entries: u32, + max_encoded_size: usize, +} + impl PrimitiveStructuralEncoder { pub fn try_new( options: &EncodingOptions, @@ -4464,113 +4473,74 @@ impl PrimitiveStructuralEncoder { }) } - /// Estimates the total size of dictionary-encoded data - /// - /// Dictionary encoding splits data into two parts: - /// 1. Dictionary: stores unique values - /// 2. Indices: maps each value to a dictionary entry - /// - /// For FixedWidth: - /// - Dictionary values: cardinality × (bits_per_value / 8) - /// - Indices: num_values × 4 bytes (32-bit i32) - /// - /// For VariableWidth (strings/binary): - /// - Dictionary values: cardinality × avg_value_size (actual data) - /// - Indices: num_values × 4 bytes (32-bit i32) - fn estimate_dict_size(data_block: &DataBlock, version: LanceFileVersion) -> Option { - let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) { - cardinality_array.as_primitive::().value(0) - } else { - return None; - }; + fn should_dictionary_encode( + data_block: &DataBlock, + field: &Field, + version: LanceFileVersion, + ) -> Option { + const DEFAULT_SAMPLE_SIZE: usize = 4096; + const DEFAULT_SAMPLE_UNIQUE_RATIO: f64 = 0.98; - let num_values = data_block.num_values(); - if num_values == 0 { - return None; - } + // Since we only dictionary encode FixedWidth and VariableWidth blocks for now, we skip + // estimating the size for other types. match data_block { DataBlock::FixedWidth(fixed) => { if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 { return None; } - // The current fixed-width dictionary encoding uses i32 indices. - if cardinality > i32::MAX as u64 { - return None; - } - // We currently only support dictionary encoding for 64-bit and 128-bit fixed-width values. if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 { return None; } if fixed.bits_per_value % 8 != 0 { return None; } - // Dictionary: cardinality unique values at value bit width - let dict_size = cardinality * (fixed.bits_per_value / 8); - // Indices: num_values indices at 32 bits each - let indices_size = num_values * (DICT_INDICES_BITS_PER_VALUE / 8); - Some(dict_size + indices_size) } DataBlock::VariableWidth(var) => { - // Only 32-bit and 64-bit offsets are supported if var.bits_per_offset != 32 && var.bits_per_offset != 64 { return None; } - if cardinality > i32::MAX as u64 { - return None; - } - - let bytes_per_offset = var.bits_per_offset as u64 / 8; - let avg_value_size = (var.data.len() as u64) / num_values; - - let dict_values_size = cardinality.checked_mul(avg_value_size)?; - let dict_offsets_size = cardinality.checked_mul(bytes_per_offset)?; - let indices_size = num_values.checked_mul(DICT_INDICES_BITS_PER_VALUE / 8)?; - - dict_values_size - .checked_add(dict_offsets_size)? - .checked_add(indices_size) - } - _ => None, - } - } - - fn should_dictionary_encode( - data_block: &DataBlock, - field: &Field, - version: LanceFileVersion, - ) -> bool { - // Since we only dictionary encode FixedWidth and VariableWidth blocks for now, we skip - // estimating the size - match data_block { - DataBlock::FixedWidth(fixed) => { - if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 { - return false; - } - if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 { - return false; - } - } - DataBlock::VariableWidth(_) => {} - _ => return false, - } - - // Currently VariableWidth only supports 32 and 64 bits - if let DataBlock::VariableWidth(var) = data_block { - if var.bits_per_offset != 32 && var.bits_per_offset != 64 { - return false; } + _ => return None, } - // Don't dictionary encode tiny arrays + // Don't dictionary encode tiny arrays. let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL") .ok() .and_then(|val| val.parse().ok()) .unwrap_or(100); if data_block.num_values() < too_small { - return false; + return None; } - // Get size ratio from metadata or env var, default to 0.8 + let num_values = data_block.num_values(); + + // Apply divisor threshold and cap. This is intentionally conservative: the goal is to + // avoid spending too much CPU trying to estimate very high cardinalities. + let divisor: u64 = field + .metadata + .get(DICT_DIVISOR_META_KEY) + .and_then(|val| val.parse().ok()) + .or_else(|| { + env::var("LANCE_ENCODING_DICT_DIVISOR") + .ok() + .and_then(|val| val.parse().ok()) + }) + .unwrap_or(DEFAULT_DICT_DIVISOR); + + let max_cardinality: u64 = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_DICT_MAX_CARDINALITY); + + let threshold_cardinality = num_values + .checked_div(divisor.max(1)) + .unwrap_or(0) + .min(max_cardinality); + if threshold_cardinality == 0 { + return None; + } + + // Get size ratio from metadata or env var. let threshold_ratio = field .metadata .get(DICT_SIZE_RATIO_META_KEY) @@ -4580,9 +4550,8 @@ impl PrimitiveStructuralEncoder { .ok() .and_then(|val| val.parse().ok()) }) - .unwrap_or(0.8); + .unwrap_or(DEFAULT_DICT_SIZE_RATIO); - // Validate size ratio is in valid range if threshold_ratio <= 0.0 || threshold_ratio > 1.0 { panic!( "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].", @@ -4590,20 +4559,117 @@ impl PrimitiveStructuralEncoder { ); } - // Get raw data size let data_size = data_block.data_size(); + if data_size == 0 { + return None; + } - // Estimate dictionary-encoded size - let Some(encoded_size) = Self::estimate_dict_size(data_block, version) else { - return false; - }; + let max_encoded_size = (data_size as f64 * threshold_ratio) as u64; + let max_encoded_size = usize::try_from(max_encoded_size).ok()?; - let size_ratio_actual = if data_size > 0 { - encoded_size as f64 / data_size as f64 - } else { - return false; - }; - size_ratio_actual < threshold_ratio + // Avoid probing dictionary encoding on data that appears to be near-unique. + if Self::sample_is_near_unique( + data_block, + DEFAULT_SAMPLE_SIZE, + DEFAULT_SAMPLE_UNIQUE_RATIO, + )? { + return None; + } + + let max_dict_entries = u32::try_from(threshold_cardinality.min(i32::MAX as u64)).ok()?; + Some(DictEncodingBudget { + max_dict_entries, + max_encoded_size, + }) + } + + /// Probe whether a page looks near-unique before attempting dictionary encoding. + /// + /// The probe uses deterministic stride sampling (not RNG sampling), which keeps + /// the check cheap and reproducible across runs. The result is only a gate for + /// whether we try dictionary encoding, not a cardinality statistic. + fn sample_is_near_unique( + data_block: &DataBlock, + max_samples: usize, + unique_ratio_threshold: f64, + ) -> Option { + use std::collections::HashSet; + + if unique_ratio_threshold <= 0.0 || unique_ratio_threshold > 1.0 { + return None; + } + + let num_values = usize::try_from(data_block.num_values()).ok()?; + if num_values == 0 { + return Some(false); + } + + let sample_count = num_values.min(max_samples).max(1); + // Uniform stride sampling across the page. + let step = (num_values / sample_count).max(1); + + match data_block { + DataBlock::FixedWidth(fixed) => match fixed.bits_per_value { + 64 => { + let values = fixed.data.borrow_to_typed_slice::(); + let values = values.as_ref(); + let mut unique: HashSet = HashSet::with_capacity(sample_count.min(1024)); + for idx in (0..num_values).step_by(step).take(sample_count) { + unique.insert(values.get(idx).copied()?); + } + let ratio = unique.len() as f64 / sample_count as f64; + // Avoid overreacting to tiny pages with too few samples. + Some(sample_count >= 1024 && ratio >= unique_ratio_threshold) + } + 128 => { + let values = fixed.data.borrow_to_typed_slice::(); + let values = values.as_ref(); + let mut unique: HashSet = HashSet::with_capacity(sample_count.min(1024)); + for idx in (0..num_values).step_by(step).take(sample_count) { + unique.insert(values.get(idx).copied()?); + } + let ratio = unique.len() as f64 / sample_count as f64; + Some(sample_count >= 1024 && ratio >= unique_ratio_threshold) + } + _ => Some(false), + }, + DataBlock::VariableWidth(var) => { + use xxhash_rust::xxh3::xxh3_64; + + // Hash variable-width slices instead of storing borrowed slice keys. + let mut unique: HashSet = HashSet::with_capacity(sample_count.min(1024)); + match var.bits_per_offset { + 32 => { + let offsets_ref = var.offsets.borrow_to_typed_slice::(); + let offsets: &[u32] = offsets_ref.as_ref(); + for i in (0..num_values).step_by(step).take(sample_count) { + let start = usize::try_from(*offsets.get(i)?).ok()?; + let end = usize::try_from(*offsets.get(i + 1)?).ok()?; + if start > end || end > var.data.len() { + return None; + } + unique.insert(xxh3_64(&var.data[start..end])); + } + } + 64 => { + let offsets_ref = var.offsets.borrow_to_typed_slice::(); + let offsets: &[u64] = offsets_ref.as_ref(); + for i in (0..num_values).step_by(step).take(sample_count) { + let start = usize::try_from(*offsets.get(i)?).ok()?; + let end = usize::try_from(*offsets.get(i + 1)?).ok()?; + if start > end || end > var.data.len() { + return None; + } + unique.insert(xxh3_64(&var.data[start..end])); + } + } + _ => return Some(false), + } + let ratio = unique.len() as f64 / sample_count as f64; + Some(sample_count >= 1024 && ratio >= unique_ratio_threshold) + } + _ => Some(false), + } } // Creates an encode task, consuming all buffered data @@ -4738,16 +4804,19 @@ impl PrimitiveStructuralEncoder { } else { // Try dictionary encoding first if applicable. If encoding aborts, fall back to the // preferred structural encoding. - let dict_result = if Self::should_dictionary_encode(&data_block, &field, version) { - log::debug!( - "Encoding column {} with {} items using dictionary encoding (mini-block layout)", - column_idx, - num_values - ); - dict::dictionary_encode(data_block.clone()) - } else { - None - }; + let dict_result = Self::should_dictionary_encode(&data_block, &field, version) + .and_then(|budget| { + log::debug!( + "Encoding column {} with {} items using dictionary encoding (mini-block layout)", + column_idx, + num_values + ); + dict::dictionary_encode( + &data_block, + budget.max_dict_entries, + budget.max_encoded_size, + ) + }); if let Some((indices_data_block, dictionary_data_block)) = dict_result { Self::encode_miniblock( @@ -4914,7 +4983,7 @@ mod tests { use crate::format::ProtobufUtils21; use crate::testing::{check_round_trip_encoding_of_data, TestCases}; use crate::version::LanceFileVersion; - use arrow_array::{ArrayRef, Int8Array, StringArray, UInt64Array}; + use arrow_array::{ArrayRef, Int8Array, StringArray}; use arrow_schema::DataType; use std::collections::HashMap; use std::{collections::VecDeque, sync::Arc}; @@ -6110,33 +6179,38 @@ mod tests { } // Dictionary encoding decision tests - /// Helper to create FixedWidth test data block with exact cardinality stat injected - /// to ensure consistent test behavior (avoids HLL estimation error) fn create_test_fixed_data_block( num_values: u64, cardinality: u64, bits_per_value: u64, ) -> DataBlock { - use crate::statistics::Stat; - + assert!(cardinality > 0); + assert!(cardinality <= num_values); let block_info = BlockInfo::default(); - // Manually inject exact cardinality stat for consistent test behavior - let cardinality_array = Arc::new(UInt64Array::from(vec![cardinality])); - block_info - .0 - .write() - .unwrap() - .insert(Stat::Cardinality, cardinality_array); - assert_eq!(bits_per_value % 8, 0); - let bytes_per_value = bits_per_value / 8; + let data = match bits_per_value { + 32 => { + let values = (0..num_values) + .map(|i| (i % cardinality) as u32) + .collect::>(); + crate::buffer::LanceBuffer::reinterpret_vec(values) + } + 64 => { + let values = (0..num_values).map(|i| i % cardinality).collect::>(); + crate::buffer::LanceBuffer::reinterpret_vec(values) + } + 128 => { + let values = (0..num_values) + .map(|i| (i % cardinality) as u128) + .collect::>(); + crate::buffer::LanceBuffer::reinterpret_vec(values) + } + _ => unreachable!(), + }; DataBlock::FixedWidth(FixedWidthDataBlock { bits_per_value, - data: crate::buffer::LanceBuffer::from(vec![ - 0u8; - (num_values * bytes_per_value) as usize - ]), + data, num_values, block_info, }) @@ -6144,7 +6218,6 @@ mod tests { /// Helper to create VariableWidth (string) test data block with exact cardinality fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock { - use crate::statistics::Stat; use arrow_array::StringArray; assert!(cardinality <= num_values && cardinality > 0); @@ -6155,61 +6228,41 @@ mod tests { } let array = StringArray::from(values); - let block = DataBlock::from_array(Arc::new(array) as ArrayRef); - - // Manually inject stats for consistent test behavior - if let DataBlock::VariableWidth(ref var_block) = block { - let mut info = var_block.block_info.0.write().unwrap(); - // Cardinality: exact value to avoid HLL estimation error - info.insert( - Stat::Cardinality, - Arc::new(UInt64Array::from(vec![cardinality])), - ); - } - - block + DataBlock::from_array(Arc::new(array) as ArrayRef) } #[test] - fn test_estimate_dict_size_fixed_width() { - use crate::encodings::logical::primitive::dict::DICT_INDICES_BITS_PER_VALUE; - - let bits_per_value = 128; - let block = create_test_fixed_data_block(1000, 400, bits_per_value); - let estimated_size = - PrimitiveStructuralEncoder::estimate_dict_size(&block, LanceFileVersion::V2_1).unwrap(); - - // Dictionary: 400 * 16 bytes (128-bit values) - // Indices: 1000 * 4 bytes (32-bit i32) - let expected_dict_size = 400 * (bits_per_value / 8); - let expected_indices_size = 1000 * (DICT_INDICES_BITS_PER_VALUE / 8); - let expected_total = expected_dict_size + expected_indices_size; - - assert_eq!(estimated_size, expected_total); - } + fn test_should_dictionary_encode() { + use crate::constants::DICT_SIZE_RATIO_META_KEY; + use lance_core::datatypes::Field as LanceField; - #[test] - fn test_estimate_dict_size_variable_width() { - let block = create_test_variable_width_block(1000, 400); - let estimated_size = - PrimitiveStructuralEncoder::estimate_dict_size(&block, LanceFileVersion::V2_1).unwrap(); + // Create data where dict encoding saves space + let block = create_test_variable_width_block(1000, 10); - // Get actual data size - let data_size = block.data_size(); - let avg_value_size = data_size / 1000; + let mut metadata = HashMap::new(); + metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string()); + let arrow_field = + arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata); + let field = LanceField::try_from(&arrow_field).unwrap(); - let expected = 400 * avg_value_size + 1000 * 4; + let result = PrimitiveStructuralEncoder::should_dictionary_encode( + &block, + &field, + LanceFileVersion::V2_1, + ); - assert_eq!(estimated_size, expected); + assert!( + result.is_some(), + "Should use dictionary encode based on size" + ); } #[test] - fn test_should_dictionary_encode() { + fn test_should_not_dictionary_encode_unsupported_bits() { use crate::constants::DICT_SIZE_RATIO_META_KEY; use lance_core::datatypes::Field as LanceField; - // Create data where dict encoding saves space - let block = create_test_variable_width_block(1000, 10); + let block = create_test_fixed_data_block(1000, 1000, 32); let mut metadata = HashMap::new(); metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string()); @@ -6223,20 +6276,24 @@ mod tests { LanceFileVersion::V2_1, ); - assert!(result, "Should use dictionary encode based on size"); + assert!( + result.is_none(), + "Should not use dictionary encode for unsupported bit width" + ); } #[test] - fn test_should_not_dictionary_encode() { + fn test_should_not_dictionary_encode_near_unique_sample() { use crate::constants::DICT_SIZE_RATIO_META_KEY; use lance_core::datatypes::Field as LanceField; - let block = create_test_fixed_data_block(1000, 1000, 128); + let num_values = 5000; + let block = create_test_variable_width_block(num_values, num_values); let mut metadata = HashMap::new(); - metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string()); + metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "1.0".to_string()); let arrow_field = - arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata); + arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata); let field = LanceField::try_from(&arrow_field).unwrap(); let result = PrimitiveStructuralEncoder::should_dictionary_encode( @@ -6245,7 +6302,10 @@ mod tests { LanceFileVersion::V2_1, ); - assert!(!result, "Should not use dictionary encode based on size"); + assert!( + result.is_none(), + "Should not probe dictionary encoding for near-unique data" + ); } async fn encode_first_page( diff --git a/rust/lance-encoding/src/encodings/logical/primitive/dict.rs b/rust/lance-encoding/src/encodings/logical/primitive/dict.rs index 22bb0b9a009..c34c3dba2c6 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive/dict.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive/dict.rs @@ -115,7 +115,8 @@ pub fn normalize_dict_nulls(array: Arc) -> Result> { fn dict_encode_variable_width( variable_width_data_block: &VariableWidthBlock, bits_per_offset: u8, - cardinality: u64, + max_dict_entries: u32, + max_encoded_size: usize, ) -> Option<(DataBlock, DataBlock)> where T: ArrowNativeType, @@ -134,21 +135,17 @@ where let max_len = max_len.as_primitive::().value(0); let max_dict_data_len = variable_width_data_block.data.len(); - let expected_dict_data_len = max_len - .checked_mul(cardinality) - .and_then(|v| >::try_from(v).ok()); - let dict_data_capacity = expected_dict_data_len - .map(|len| len.min(max_dict_data_len)) - .unwrap_or(max_dict_data_len); + let max_len: usize = max_len.try_into().unwrap_or(usize::MAX); + let dict_data_capacity = max_len + .saturating_mul(32) + .max(1024) + .min(max_dict_data_len) + .min(max_encoded_size); let mut dictionary_buffer: Vec = Vec::with_capacity(dict_data_capacity); let mut dictionary_offsets_buffer = vec![T::default()]; let mut curr_idx = 0; let mut indices_buffer = Vec::with_capacity(variable_width_data_block.num_values as usize); - let original_size = variable_width_data_block - .data_size() - .try_into() - .unwrap_or(usize::MAX); let bytes_per_offset = (bits_per_offset / 8) as usize; for window in offsets.windows(2) { @@ -163,6 +160,9 @@ where let idx = match map.entry(U8SliceKey(key)) { Entry::Occupied(entry) => *entry.get(), Entry::Vacant(entry) => { + if max_dict_entries == 0 || curr_idx as u32 >= max_dict_entries { + return None; + } if curr_idx == i32::MAX { return None; } @@ -188,7 +188,7 @@ where .len() .saturating_add(indices_bytes) .saturating_add(offsets_bytes); - if encoded_size > original_size { + if encoded_size > max_encoded_size { return None; } } @@ -218,15 +218,13 @@ where /// Currently only supported for some common cases (string / binary / 64-bit / 128-bit) /// /// Returns a block of indices (will always be a fixed width data block) and a block of dictionary -pub fn dictionary_encode(mut data_block: DataBlock) -> Option<(DataBlock, DataBlock)> { - let cardinality = data_block - .get_stat(Stat::Cardinality) - .unwrap() - .as_primitive::() - .value(0); - let data_block_size = usize::try_from(data_block.data_size()).unwrap_or(usize::MAX); +pub fn dictionary_encode( + data_block: &DataBlock, + max_dict_entries: u32, + max_encoded_size: usize, +) -> Option<(DataBlock, DataBlock)> { match data_block { - DataBlock::FixedWidth(ref mut fixed_width_data_block) => { + DataBlock::FixedWidth(fixed_width_data_block) => { use std::collections::hash_map::Entry; let bytes_per_value = match fixed_width_data_block.bits_per_value { @@ -240,7 +238,8 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> Option<(DataBlock, DataBl let mut map = HashMap::new(); let u64_slice = fixed_width_data_block.data.borrow_to_typed_slice::(); let u64_slice = u64_slice.as_ref(); - let mut dictionary_buffer = Vec::with_capacity(cardinality as usize); + let mut dictionary_buffer = + Vec::with_capacity((fixed_width_data_block.num_values as usize).min(1024)); let mut indices_buffer = Vec::with_capacity(fixed_width_data_block.num_values as usize); let mut curr_idx: i32 = 0; @@ -249,6 +248,9 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> Option<(DataBlock, DataBl let idx = match map.entry(value) { Entry::Occupied(entry) => *entry.get(), Entry::Vacant(entry) => { + if max_dict_entries == 0 || curr_idx as u32 >= max_dict_entries { + return None; + } if curr_idx == i32::MAX { return None; } @@ -265,7 +267,7 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> Option<(DataBlock, DataBl .len() .saturating_mul(DICT_INDICES_BITS_PER_VALUE as usize / 8); let encoded_size = dict_bytes.saturating_add(indices_bytes); - if encoded_size > data_block_size { + if encoded_size > max_encoded_size { return None; } } @@ -292,7 +294,8 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> Option<(DataBlock, DataBl let mut map = HashMap::new(); let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::(); let u128_slice = u128_slice.as_ref(); - let mut dictionary_buffer = Vec::with_capacity(cardinality as usize); + let mut dictionary_buffer = + Vec::with_capacity((fixed_width_data_block.num_values as usize).min(1024)); let mut indices_buffer = Vec::with_capacity(fixed_width_data_block.num_values as usize); let mut curr_idx: i32 = 0; @@ -301,6 +304,9 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> Option<(DataBlock, DataBl let idx = match map.entry(value) { Entry::Occupied(entry) => *entry.get(), Entry::Vacant(entry) => { + if max_dict_entries == 0 || curr_idx as u32 >= max_dict_entries { + return None; + } if curr_idx == i32::MAX { return None; } @@ -317,7 +323,7 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> Option<(DataBlock, DataBl .len() .saturating_mul(DICT_INDICES_BITS_PER_VALUE as usize / 8); let encoded_size = dict_bytes.saturating_add(indices_bytes); - if encoded_size > data_block_size { + if encoded_size > max_encoded_size { return None; } } @@ -342,10 +348,20 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> Option<(DataBlock, DataBl _ => None, } } - DataBlock::VariableWidth(ref variable_width_data_block) => { + DataBlock::VariableWidth(variable_width_data_block) => { match variable_width_data_block.bits_per_offset { - 32 => dict_encode_variable_width::(variable_width_data_block, 32, cardinality), - 64 => dict_encode_variable_width::(variable_width_data_block, 64, cardinality), + 32 => dict_encode_variable_width::( + variable_width_data_block, + 32, + max_dict_entries, + max_encoded_size, + ), + 64 => dict_encode_variable_width::( + variable_width_data_block, + 64, + max_dict_entries, + max_encoded_size, + ), _ => None, } } @@ -386,7 +402,8 @@ mod tests { data_block.compute_stat(); // Dictionary encoding should abort and return None - let result = dictionary_encode(data_block); + let max_encoded_size = usize::try_from(data_block.data_size()).unwrap_or(usize::MAX); + let result = dictionary_encode(&data_block, 1000, max_encoded_size); assert!( result.is_none(), "Dictionary encoding should abort for high cardinality u128 data" @@ -416,7 +433,8 @@ mod tests { data_block.compute_stat(); // Dictionary encoding should succeed and return Some - let result = dictionary_encode(data_block); + let max_encoded_size = usize::try_from(data_block.data_size()).unwrap_or(usize::MAX); + let result = dictionary_encode(&data_block, 1000, max_encoded_size); assert!( result.is_some(), "Dictionary encoding should succeed for low cardinality u128 data" @@ -455,7 +473,8 @@ mod tests { let data_block = DataBlock::from_array(Arc::new(array) as Arc); // Dictionary encoding should abort and return None - let result = dictionary_encode(data_block); + let max_encoded_size = usize::try_from(data_block.data_size()).unwrap_or(usize::MAX); + let result = dictionary_encode(&data_block, 10, max_encoded_size); assert!( result.is_none(), "Dictionary encoding should abort for high cardinality string data" @@ -477,7 +496,8 @@ mod tests { let data_block = DataBlock::from_array(Arc::new(array) as Arc); // Dictionary encoding should succeed and return Some - let result = dictionary_encode(data_block); + let max_encoded_size = usize::try_from(data_block.data_size()).unwrap_or(usize::MAX); + let result = dictionary_encode(&data_block, 100, max_encoded_size); assert!( result.is_some(), "Dictionary encoding should succeed for low cardinality data" @@ -512,6 +532,44 @@ mod tests { } other => panic!("Expected VariableWidth data block, got {:?}", other), }; - assert!(dictionary_encode(invalid_block).is_none()); + let max_encoded_size = usize::try_from(invalid_block.data_size()).unwrap_or(usize::MAX); + assert!(dictionary_encode(&invalid_block, 100, max_encoded_size).is_none()); + } + + #[test] + fn test_dictionary_encode_respects_size_limit() { + let num_values = 10_000u64; + let cardinality = 50u64; + + let mut values = Vec::with_capacity(num_values as usize); + for i in 0..num_values { + values.push(format!("value_{:08}", i % cardinality)); + } + + let array = StringArray::from(values); + let data_block = DataBlock::from_array(Arc::new(array) as Arc); + + let full_size = usize::try_from(data_block.data_size()).unwrap_or(usize::MAX); + let too_small_limit = full_size / 10; + assert!(dictionary_encode(&data_block, 1000, too_small_limit).is_none()); + assert!(dictionary_encode(&data_block, 1000, full_size).is_some()); + } + + #[test] + fn test_dictionary_encode_respects_entry_limit() { + let num_values = 10_000u64; + let cardinality = 200u64; + + let mut values = Vec::with_capacity(num_values as usize); + for i in 0..num_values { + values.push(format!("value_{:08}", i % cardinality)); + } + + let array = StringArray::from(values); + let data_block = DataBlock::from_array(Arc::new(array) as Arc); + + let max_encoded_size = usize::try_from(data_block.data_size()).unwrap_or(usize::MAX); + assert!(dictionary_encode(&data_block, 10, max_encoded_size).is_none()); + assert!(dictionary_encode(&data_block, 500, max_encoded_size).is_some()); } } diff --git a/rust/lance-encoding/src/previous/encodings/physical.rs b/rust/lance-encoding/src/previous/encodings/physical.rs index 83c9287403e..9c3001d3d0f 100644 --- a/rust/lance-encoding/src/previous/encodings/physical.rs +++ b/rust/lance-encoding/src/previous/encodings/physical.rs @@ -47,14 +47,12 @@ fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) { /// Convert a protobuf buffer encoding into a physical page scheduler fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box { let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers); - let compression_config: CompressionConfig = if encoding.compression.is_none() { - CompressionConfig::new(CompressionScheme::None, None) - } else { - let compression = encoding.compression.as_ref().unwrap(); - CompressionConfig::new( + let compression_config: CompressionConfig = match encoding.compression.as_ref() { + None => CompressionConfig::new(CompressionScheme::None, None), + Some(compression) => CompressionConfig::new( compression.scheme.as_str().parse().unwrap(), compression.level, - ) + ), }; match encoding.bits_per_value { 1 => Box::new(DenseBitmapScheduler::new(buffer_offset)), diff --git a/rust/lance-encoding/src/statistics.rs b/rust/lance-encoding/src/statistics.rs index b5cb65ee879..d3362965c2f 100644 --- a/rust/lance-encoding/src/statistics.rs +++ b/rust/lance-encoding/src/statistics.rs @@ -78,13 +78,10 @@ impl ComputeStat for VariableWidthBlock { let data_size = self.data_size(); let data_size_array = Arc::new(UInt64Array::from(vec![data_size])); - let cardinality_array = self.cardinality(); - let max_length_array = self.max_length(); let mut info = self.block_info.0.write().unwrap(); info.insert(Stat::DataSize, data_size_array); - info.insert(Stat::Cardinality, cardinality_array); info.insert(Stat::MaxLength, max_length_array); } } @@ -189,12 +186,31 @@ impl GetStat for NullableDataBlock { impl GetStat for VariableWidthBlock { fn get_stat(&self, stat: Stat) -> Option> { - let block_info = self.block_info.0.read().unwrap(); + { + let block_info = self.block_info.0.read().unwrap(); + if block_info.is_empty() { + panic!("get_stat should be called after statistics are computed."); + } + if let Some(stat_value) = block_info.get(&stat) { + return Some(stat_value.clone()); + } + } + if stat != Stat::Cardinality { + return None; + } + + let computed = self.compute_cardinality(); + let mut block_info = self.block_info.0.write().unwrap(); if block_info.is_empty() { panic!("get_stat should be called after statistics are computed."); } - block_info.get(&stat).cloned() + Some( + block_info + .entry(stat) + .or_insert_with(|| computed.clone()) + .clone(), + ) } } @@ -216,7 +232,7 @@ impl GetStat for FixedSizeListBlock { impl VariableWidthBlock { // Caveat: the computation here assumes VariableWidthBlock.offsets maps directly to VariableWidthBlock.data // without any adjustment(for example, no null_adjustment for offsets) - fn cardinality(&mut self) -> Arc { + fn compute_cardinality(&self) -> Arc { const PRECISION: u8 = 4; // The default hasher (currently sip hash 1-3) does not seem to give good results // with HLL. @@ -1227,4 +1243,27 @@ mod tests { let info = fixed.block_info.0.read().unwrap(); assert!(info.contains_key(&Stat::Cardinality)); } + + #[test] + fn test_variable_width_cardinality_is_lazy() { + let string_array = StringArray::from(vec!["a", "b", "a"]); + let block = DataBlock::from_array(string_array); + + let DataBlock::VariableWidth(var) = &block else { + panic!("Expected VariableWidth datablock"); + }; + + { + let info = var.block_info.0.read().unwrap(); + assert!(info.contains_key(&Stat::DataSize)); + assert!(info.contains_key(&Stat::MaxLength)); + assert!(!info.contains_key(&Stat::Cardinality)); + } + + let cardinality = block.expect_single_stat::(Stat::Cardinality); + assert_eq!(cardinality, 2); + + let info = var.block_info.0.read().unwrap(); + assert!(info.contains_key(&Stat::Cardinality)); + } }