Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions rust/lance-encoding/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
use snafu::location;
use std::{str::FromStr, sync::Arc};

/// Default threshold for RLE compression selection.
/// RLE is chosen when the run count is less than this fraction of total values.
/// Default threshold for RLE compression selection when the user explicitly provides a threshold.
///
/// If no threshold is provided, we use a size model instead of a fixed run ratio.
/// This preserves existing behavior for users relying on the default, while making
/// the default selection more type-aware.
const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;

// Minimum block size (32kb) to trigger general block compression
Expand Down Expand Up @@ -168,12 +171,35 @@ fn try_rle_for_mini_block(
return None;
}

let type_size = bits / 8;
let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
let threshold = params
.rle_threshold
.unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);

if (run_count as f64) < (data.num_values as f64) * threshold {
// If the user explicitly provided a threshold then honor it as an additional guard.
// A lower threshold makes RLE harder to trigger and can be used to avoid CPU overhead.
let passes_threshold = match params.rle_threshold {
Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
None => true,
};

if !passes_threshold {
return None;
}

// Estimate the encoded size.
//
// RLE stores (value, run_length) pairs. Run lengths are u8 and long runs are split into
// multiple entries of up to 255 values. We don't know the run length distribution here,
// so we conservatively account for splitting with an upper bound.
let num_values = data.num_values;
let estimated_pairs = (run_count + (num_values / 255)).min(num_values);

let raw_bytes = (num_values as u128) * (type_size as u128);
let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);

if rle_bytes < raw_bytes {
return Some(Box::new(RleMiniBlockEncoder::new()));
}
None
Expand Down
89 changes: 43 additions & 46 deletions rust/lance-encoding/src/encodings/physical/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::data::DataBlock;
use crate::data::{BlockInfo, FixedWidthDataBlock};
use crate::encodings::logical::primitive::miniblock::{
MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
MAX_MINIBLOCK_VALUES,
};
use crate::format::pb21::CompressiveEncoding;
use crate::format::ProtobufUtils21;
Expand Down Expand Up @@ -199,12 +200,7 @@ impl RleMiniBlockEncoder {
let type_size = std::mem::size_of::<T>();

let chunk_start = offset * type_size;
// FIXME(xuanwo): we don't allow 4096 values as a workaround for https://github.com/lance-format/lance/issues/4429
// Since while rep/def takes 4B, 4Ki values will lead to the
// generated chunk buffer too large.MAX_MINIBLOCK_VALUES
//
// let max_by_count = as usize;
let max_by_count = 2048usize;
let max_by_count = MAX_MINIBLOCK_VALUES as usize;
let max_values = values_remaining.min(max_by_count);
let chunk_end = chunk_start + max_values * type_size;

Expand All @@ -229,19 +225,19 @@ impl RleMiniBlockEncoder {
let mut bytes_used = 0usize;
let mut total_values_encoded = 0usize; // Track total encoded values

// Power-of-2 checkpoints for ensuring non-last chunks have valid sizes
// For smaller data types like u8, we can use larger initial checkpoints
// since they take less space per value
let checkpoints = match type_size {
1 => vec![256, 512, 1024, 2048, 4096], // u8 can start from 256
2 => vec![128, 256, 512, 1024, 2048, 4096], // u16 can start from 128
_ => vec![64, 128, 256, 512, 1024, 2048, 4096], // u32/u64: no difference
// Power-of-2 checkpoints for ensuring non-last chunks have valid sizes.
//
// We start from a slightly larger minimum checkpoint for smaller types since
// they encode more compactly and are less likely to hit MAX_MINIBLOCK_BYTES.
let min_checkpoint_log2 = match type_size {
1 => 8, // 256
2 => 7, // 128
_ => 6, // 64
};
let valid_checkpoints: Vec<usize> = checkpoints
.into_iter()
.filter(|&p| p <= values_remaining)
.collect();
let mut checkpoint_idx = 0;
let max_checkpoint_log2 = (values_remaining.min(MAX_MINIBLOCK_VALUES as usize))
.next_power_of_two()
.ilog2();
let mut checkpoint_log2 = min_checkpoint_log2;

// Save state at checkpoints so we can roll back if needed
let mut last_checkpoint_state = None;
Expand Down Expand Up @@ -272,17 +268,20 @@ impl RleMiniBlockEncoder {
current_length = 1;
}

// Check if we reached a power-of-2 checkpoint
if checkpoint_idx < valid_checkpoints.len()
&& total_values_encoded >= valid_checkpoints[checkpoint_idx]
{
// Check if we reached a power-of-2 checkpoint.
while checkpoint_log2 <= max_checkpoint_log2 {
let checkpoint_values = 1usize << checkpoint_log2;
if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values
{
break;
}
last_checkpoint_state = Some((
all_values.len(),
all_lengths.len(),
bytes_used,
valid_checkpoints[checkpoint_idx],
checkpoint_values,
));
checkpoint_idx += 1;
checkpoint_log2 += 1;
}
}

Expand Down Expand Up @@ -426,7 +425,7 @@ impl RleMiniBlockDecompressor {

Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value: self.bits_per_value,
data: LanceBuffer::from(decoded_data),
data: decoded_data,
num_values,
block_info: BlockInfo::default(),
}))
Expand All @@ -437,15 +436,15 @@ impl RleMiniBlockDecompressor {
values_buffer: &LanceBuffer,
lengths_buffer: &LanceBuffer,
num_values: u64,
) -> Result<Vec<u8>>
) -> Result<LanceBuffer>
where
T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType,
{
let type_size = std::mem::size_of::<T>();

if values_buffer.is_empty() || lengths_buffer.is_empty() {
if num_values == 0 {
return Ok(Vec::new());
return Ok(LanceBuffer::empty());
} else {
return Err(Error::InvalidInput {
location: location!(),
Expand Down Expand Up @@ -480,36 +479,34 @@ impl RleMiniBlockDecompressor {
let values: &[T] = values_ref.as_ref();
let lengths: &[u8] = lengths_buffer.as_ref();

let expected_byte_count = num_values as usize * type_size;
let mut decoded = Vec::with_capacity(expected_byte_count);
let expected_value_count = num_values as usize;
let mut decoded: Vec<T> = Vec::with_capacity(expected_value_count);

for (value, &length) in values.iter().zip(lengths.iter()) {
let run_length = length as usize;
let bytes_to_write = run_length * type_size;
let bytes_of_value = bytemuck::bytes_of(value);

if decoded.len() + bytes_to_write > expected_byte_count {
let remaining_bytes = expected_byte_count - decoded.len();
let remaining_values = remaining_bytes / type_size;

for _ in 0..remaining_values {
decoded.extend_from_slice(bytes_of_value);
}
if decoded.len() == expected_value_count {
break;
}

for _ in 0..run_length {
decoded.extend_from_slice(bytes_of_value);
if length == 0 {
return Err(Error::InvalidInput {
location: location!(),
source: "RLE decoding encountered a zero run length".into(),
});
}

let remaining = expected_value_count - decoded.len();
let write_len = (length as usize).min(remaining);

decoded.resize(decoded.len() + write_len, *value);
}

if decoded.len() != expected_byte_count {
if decoded.len() != expected_value_count {
return Err(Error::InvalidInput {
location: location!(),
source: format!(
"RLE decoding produced {} bytes, expected {}",
"RLE decoding produced {} values, expected {}",
decoded.len(),
expected_byte_count
expected_value_count
)
.into(),
});
Expand All @@ -520,7 +517,7 @@ impl RleMiniBlockDecompressor {
num_values,
std::any::type_name::<T>()
);
Ok(decoded)
Ok(LanceBuffer::reinterpret_vec(decoded))
}
}

Expand Down