From 9ee76f5d14971b0fe1b45a1094a9a366a3ca2d09 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 29 Dec 2025 16:38:35 +0800 Subject: [PATCH 1/3] feat: optimize rle implementation --- rust/lance-encoding/src/compression.rs | 36 ++++++-- .../src/encodings/physical/general.rs | 1 + .../src/encodings/physical/rle.rs | 88 +++++++++---------- 3 files changed, 73 insertions(+), 52 deletions(-) diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index bf106d9aa30..35b09f68064 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -69,8 +69,11 @@ use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result}; use snafu::location; use std::{str::FromStr, sync::Arc}; -/// Default threshold for RLE compression selection. -/// RLE is chosen when the run count is less than this fraction of total values. +/// Default threshold for RLE compression selection when the user explicitly provides a threshold. +/// +/// If no threshold is provided, we use a size model instead of a fixed run ratio. +/// This preserves existing behavior for users relying on the default, while making +/// the default selection more type-aware. const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5; // Minimum block size (32kb) to trigger general block compression @@ -168,12 +171,33 @@ fn try_rle_for_mini_block( return None; } + let type_size = (bits / 8) as u64; let run_count = data.expect_single_stat::(Stat::RunCount); - let threshold = params - .rle_threshold - .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD); + let threshold = params.rle_threshold.unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD); + + // If the user explicitly provided a threshold then honor it as an additional guard. + // A lower threshold makes RLE harder to trigger and can be used to avoid CPU overhead. + let passes_threshold = match params.rle_threshold { + Some(_) => (run_count as f64) < (data.num_values as f64) * threshold, + None => true, + }; + + if !passes_threshold { + return None; + } + + // Estimate the encoded size. + // + // RLE stores (value, run_length) pairs. Run lengths are u8 and long runs are split into + // multiple entries of up to 255 values. We don't know the run length distribution here, + // so we conservatively account for splitting with an upper bound. + let num_values = data.num_values; + let estimated_pairs = (run_count + (num_values / 255)).min(num_values); + + let raw_bytes = (num_values as u128) * (type_size as u128); + let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128); - if (run_count as f64) < (data.num_values as f64) * threshold { + if rle_bytes < raw_bytes { return Some(Box::new(RleMiniBlockEncoder::new())); } None diff --git a/rust/lance-encoding/src/encodings/physical/general.rs b/rust/lance-encoding/src/encodings/physical/general.rs index e824c8aacfd..b760161b9f5 100644 --- a/rust/lance-encoding/src/encodings/physical/general.rs +++ b/rust/lance-encoding/src/encodings/physical/general.rs @@ -244,6 +244,7 @@ mod tests { DataBlock::from_array(array) } + fn run_round_trip_test(test_case: TestCase) { let compressor = GeneralMiniBlockCompressor::new(test_case.inner_encoder, test_case.compression); diff --git a/rust/lance-encoding/src/encodings/physical/rle.rs b/rust/lance-encoding/src/encodings/physical/rle.rs index 758e3307521..222bc7d1c24 100644 --- a/rust/lance-encoding/src/encodings/physical/rle.rs +++ b/rust/lance-encoding/src/encodings/physical/rle.rs @@ -58,6 +58,7 @@ use crate::data::DataBlock; use crate::data::{BlockInfo, FixedWidthDataBlock}; use crate::encodings::logical::primitive::miniblock::{ MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES, + MAX_MINIBLOCK_VALUES, }; use crate::format::pb21::CompressiveEncoding; use crate::format::ProtobufUtils21; @@ -199,12 +200,7 @@ impl RleMiniBlockEncoder { let type_size = std::mem::size_of::(); let chunk_start = offset * type_size; - // FIXME(xuanwo): we don't allow 4096 values as a workaround for https://github.com/lance-format/lance/issues/4429 - // Since while rep/def takes 4B, 4Ki values will lead to the - // generated chunk buffer too large.MAX_MINIBLOCK_VALUES - // - // let max_by_count = as usize; - let max_by_count = 2048usize; + let max_by_count = MAX_MINIBLOCK_VALUES as usize; let max_values = values_remaining.min(max_by_count); let chunk_end = chunk_start + max_values * type_size; @@ -229,19 +225,19 @@ impl RleMiniBlockEncoder { let mut bytes_used = 0usize; let mut total_values_encoded = 0usize; // Track total encoded values - // Power-of-2 checkpoints for ensuring non-last chunks have valid sizes - // For smaller data types like u8, we can use larger initial checkpoints - // since they take less space per value - let checkpoints = match type_size { - 1 => vec![256, 512, 1024, 2048, 4096], // u8 can start from 256 - 2 => vec![128, 256, 512, 1024, 2048, 4096], // u16 can start from 128 - _ => vec![64, 128, 256, 512, 1024, 2048, 4096], // u32/u64: no difference + // Power-of-2 checkpoints for ensuring non-last chunks have valid sizes. + // + // We start from a slightly larger minimum checkpoint for smaller types since + // they encode more compactly and are less likely to hit MAX_MINIBLOCK_BYTES. + let min_checkpoint_log2 = match type_size { + 1 => 8, // 256 + 2 => 7, // 128 + _ => 6, // 64 }; - let valid_checkpoints: Vec = checkpoints - .into_iter() - .filter(|&p| p <= values_remaining) - .collect(); - let mut checkpoint_idx = 0; + let max_checkpoint_log2 = (values_remaining.min(MAX_MINIBLOCK_VALUES as usize)) + .next_power_of_two() + .ilog2(); + let mut checkpoint_log2 = min_checkpoint_log2; // Save state at checkpoints so we can roll back if needed let mut last_checkpoint_state = None; @@ -272,17 +268,19 @@ impl RleMiniBlockEncoder { current_length = 1; } - // Check if we reached a power-of-2 checkpoint - if checkpoint_idx < valid_checkpoints.len() - && total_values_encoded >= valid_checkpoints[checkpoint_idx] - { + // Check if we reached a power-of-2 checkpoint. + while checkpoint_log2 <= max_checkpoint_log2 { + let checkpoint_values = 1usize << checkpoint_log2; + if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values { + break; + } last_checkpoint_state = Some(( all_values.len(), all_lengths.len(), bytes_used, - valid_checkpoints[checkpoint_idx], + checkpoint_values, )); - checkpoint_idx += 1; + checkpoint_log2 += 1; } } @@ -426,7 +424,7 @@ impl RleMiniBlockDecompressor { Ok(DataBlock::FixedWidth(FixedWidthDataBlock { bits_per_value: self.bits_per_value, - data: LanceBuffer::from(decoded_data), + data: decoded_data, num_values, block_info: BlockInfo::default(), })) @@ -437,7 +435,7 @@ impl RleMiniBlockDecompressor { values_buffer: &LanceBuffer, lengths_buffer: &LanceBuffer, num_values: u64, - ) -> Result> + ) -> Result where T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType, { @@ -445,7 +443,7 @@ impl RleMiniBlockDecompressor { if values_buffer.is_empty() || lengths_buffer.is_empty() { if num_values == 0 { - return Ok(Vec::new()); + return Ok(LanceBuffer::empty()); } else { return Err(Error::InvalidInput { location: location!(), @@ -480,36 +478,34 @@ impl RleMiniBlockDecompressor { let values: &[T] = values_ref.as_ref(); let lengths: &[u8] = lengths_buffer.as_ref(); - let expected_byte_count = num_values as usize * type_size; - let mut decoded = Vec::with_capacity(expected_byte_count); + let expected_value_count = num_values as usize; + let mut decoded: Vec = Vec::with_capacity(expected_value_count); for (value, &length) in values.iter().zip(lengths.iter()) { - let run_length = length as usize; - let bytes_to_write = run_length * type_size; - let bytes_of_value = bytemuck::bytes_of(value); - - if decoded.len() + bytes_to_write > expected_byte_count { - let remaining_bytes = expected_byte_count - decoded.len(); - let remaining_values = remaining_bytes / type_size; - - for _ in 0..remaining_values { - decoded.extend_from_slice(bytes_of_value); - } + if decoded.len() == expected_value_count { break; } - for _ in 0..run_length { - decoded.extend_from_slice(bytes_of_value); + if length == 0 { + return Err(Error::InvalidInput { + location: location!(), + source: "RLE decoding encountered a zero run length".into(), + }); } + + let remaining = expected_value_count - decoded.len(); + let write_len = (length as usize).min(remaining); + + decoded.resize(decoded.len() + write_len, *value); } - if decoded.len() != expected_byte_count { + if decoded.len() != expected_value_count { return Err(Error::InvalidInput { location: location!(), source: format!( - "RLE decoding produced {} bytes, expected {}", + "RLE decoding produced {} values, expected {}", decoded.len(), - expected_byte_count + expected_value_count ) .into(), }); @@ -520,7 +516,7 @@ impl RleMiniBlockDecompressor { num_values, std::any::type_name::() ); - Ok(decoded) + Ok(LanceBuffer::reinterpret_vec(decoded)) } } From 83e1a3ac363b9e9b6a7ad52b5b24b6d312a2fad5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 29 Dec 2025 16:39:17 +0800 Subject: [PATCH 2/3] format code --- rust/lance-encoding/src/compression.rs | 4 +++- rust/lance-encoding/src/encodings/physical/general.rs | 1 - rust/lance-encoding/src/encodings/physical/rle.rs | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index 35b09f68064..40b4b6b5548 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -173,7 +173,9 @@ fn try_rle_for_mini_block( let type_size = (bits / 8) as u64; let run_count = data.expect_single_stat::(Stat::RunCount); - let threshold = params.rle_threshold.unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD); + let threshold = params + .rle_threshold + .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD); // If the user explicitly provided a threshold then honor it as an additional guard. // A lower threshold makes RLE harder to trigger and can be used to avoid CPU overhead. diff --git a/rust/lance-encoding/src/encodings/physical/general.rs b/rust/lance-encoding/src/encodings/physical/general.rs index b760161b9f5..e824c8aacfd 100644 --- a/rust/lance-encoding/src/encodings/physical/general.rs +++ b/rust/lance-encoding/src/encodings/physical/general.rs @@ -244,7 +244,6 @@ mod tests { DataBlock::from_array(array) } - fn run_round_trip_test(test_case: TestCase) { let compressor = GeneralMiniBlockCompressor::new(test_case.inner_encoder, test_case.compression); diff --git a/rust/lance-encoding/src/encodings/physical/rle.rs b/rust/lance-encoding/src/encodings/physical/rle.rs index 222bc7d1c24..9ce9d70cbdf 100644 --- a/rust/lance-encoding/src/encodings/physical/rle.rs +++ b/rust/lance-encoding/src/encodings/physical/rle.rs @@ -271,7 +271,8 @@ impl RleMiniBlockEncoder { // Check if we reached a power-of-2 checkpoint. while checkpoint_log2 <= max_checkpoint_log2 { let checkpoint_values = 1usize << checkpoint_log2; - if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values { + if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values + { break; } last_checkpoint_state = Some(( From 54f355fc6be12452351e24313da41addf5462eca Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 29 Dec 2025 16:43:28 +0800 Subject: [PATCH 3/3] Make clippy happy --- rust/lance-encoding/src/compression.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index 40b4b6b5548..8e2edbaaec4 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -171,7 +171,7 @@ fn try_rle_for_mini_block( return None; } - let type_size = (bits / 8) as u64; + let type_size = bits / 8; let run_count = data.expect_single_stat::(Stat::RunCount); let threshold = params .rle_threshold