Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 86 additions & 16 deletions rust/lance-encoding/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::{
ProtobufUtils21,
},
statistics::{GetStat, Stat},
version::LanceFileVersion,
};

use arrow_array::{cast::AsArray, types::UInt64Type};
Expand All @@ -66,6 +67,9 @@ use std::str::FromStr;
/// RLE is chosen when the run count is less than this fraction of total values.
const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;

// Minimum block size (32kb) to trigger general block compression
const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;

/// Trait for compression algorithms that compress an entire block of data into one opaque
/// and self-described chunk.
///
Expand Down Expand Up @@ -125,6 +129,8 @@ pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
pub struct DefaultCompressionStrategy {
/// User-configured compression parameters
params: CompressionParams,
/// The lance file version for compatibilities.
version: LanceFileVersion,
}

fn try_bss_for_mini_block(
Expand Down Expand Up @@ -246,6 +252,34 @@ fn maybe_wrap_general_for_mini_block(
}
}

fn try_general_compression(
version: LanceFileVersion,
field_params: &CompressionFieldParams,
data: &DataBlock,
) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
// User-requested compression (unused today but perhaps still used
// in the future someday)
if let Some(compression_scheme) = &field_params.compression {
if compression_scheme != "none" && version >= LanceFileVersion::V2_2 {
let scheme: CompressionScheme = compression_scheme.parse()?;
let config = CompressionConfig::new(scheme, field_params.compression_level);
let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
return Ok(Some((compressor, config)));
}
}

// Automatic compression for large blocks
if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
&& version >= LanceFileVersion::V2_2
{
let compressor = Box::new(CompressedBufferEncoder::default());
let config = compressor.compressor.config();
return Ok(Some((compressor, config)));
}

Ok(None)
}

impl DefaultCompressionStrategy {
/// Create a new compression strategy with default behavior
pub fn new() -> Self {
Expand All @@ -254,7 +288,10 @@ impl DefaultCompressionStrategy {

/// Create a new compression strategy with user-configured parameters
pub fn with_params(params: CompressionParams) -> Self {
Self { params }
Self {
params,
version: LanceFileVersion::default(),
}
}

/// Parse compression parameters from field metadata
Expand Down Expand Up @@ -357,14 +394,10 @@ impl DefaultCompressionStrategy {

Ok(base_encoder)
}
}

impl CompressionStrategy for DefaultCompressionStrategy {
fn create_miniblock_compressor(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn MiniBlockCompressor>> {
/// Merge user-configured parameters with field metadata
/// Field metadata has highest priority
fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
let mut field_params = self
.params
.get_field_params(&field.name, &field.data_type());
Expand All @@ -373,6 +406,18 @@ impl CompressionStrategy for DefaultCompressionStrategy {
let metadata_params = Self::parse_field_metadata(field);
field_params.merge(&metadata_params);

field_params
}
}

impl CompressionStrategy for DefaultCompressionStrategy {
fn create_miniblock_compressor(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn MiniBlockCompressor>> {
let field_params = self.get_merged_field_params(field);

match data {
DataBlock::FixedWidth(fixed_width_data) => {
self.build_fixed_width_compressor(&field_params, fixed_width_data)
Expand Down Expand Up @@ -475,32 +520,52 @@ impl CompressionStrategy for DefaultCompressionStrategy {
field: &Field,
data: &DataBlock,
) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
let field_params = self.get_merged_field_params(field);

match data {
// Currently, block compression is used for rep/def (which is fixed width) and for dictionary
// encoding (which could be fixed width or variable width).
DataBlock::FixedWidth(fixed_width) => {
if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
return Ok((compressor, encoding));
}

// Default to uncompressed
// Try general compression (user-requested or automatic over MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION)
if let Some((compressor, config)) =
try_general_compression(self.version, &field_params, data)?
{
let encoding = ProtobufUtils21::wrapped(
config,
ProtobufUtils21::flat(fixed_width.bits_per_value, None),
)?;
return Ok((compressor, encoding));
}

let encoder = Box::new(ValueEncoder::default());
let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
Ok((encoder, encoding))
}
DataBlock::VariableWidth(variable_width) => {
// Try general compression
if let Some((compressor, config)) =
try_general_compression(self.version, &field_params, data)?
{
let encoding = ProtobufUtils21::wrapped(
config,
ProtobufUtils21::variable(
ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
None,
),
)?;
return Ok((compressor, encoding));
}

let encoder = Box::new(VariableEncoder::default());
let encoding = ProtobufUtils21::variable(
ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
None,
);
Ok((encoder, encoding))
}
_ => todo!(
"block compressor for field {:?} and block type {:?}",
field,
data.name()
),
_ => unreachable!(),
}
}
}
Expand Down Expand Up @@ -756,6 +821,11 @@ impl DecompressionStrategy for DefaultDecompressionStrategy {
out_of_line.uncompressed_bits_per_value,
)))
}
Compression::General(general) => {
let compression = general.compression.as_ref().unwrap();
let scheme = compression.scheme();
Ok(Box::new(CompressedBufferEncoder::from_scheme(scheme)?))
}
_ => todo!(),
}
}
Expand Down
57 changes: 57 additions & 0 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4401,9 +4401,12 @@ impl FieldEncoder for PrimitiveStructuralEncoder {
#[allow(clippy::single_range_in_vec_init)]
mod tests {
use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK};
use crate::decoder::PageEncoding;
use crate::encodings::logical::primitive::{
ChunkDrainInstructions, PrimitiveStructuralEncoder,
};
use crate::format::pb21;
use crate::format::pb21::compressive_encoding::Compression;
use crate::testing::{check_round_trip_encoding_of_data, TestCases};
use crate::version::LanceFileVersion;
use arrow_array::{ArrayRef, Int8Array, StringArray};
Expand Down Expand Up @@ -5382,4 +5385,58 @@ mod tests {

check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
}

#[tokio::test]
async fn test_large_dictionary_general_compression() {
use arrow_array::{ArrayRef, StringArray};
use std::collections::HashMap;
use std::sync::Arc;

// Create large string dictionary data (>32KiB) with low cardinality
// Use 100 unique strings, each 500 bytes long = 50KB dictionary
let unique_values: Vec<String> = (0..100)
.map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
.collect();

// Repeat these strings many times to create a large array
let repeated_strings: Vec<_> = unique_values
.iter()
.cycle()
.take(100_000)
.map(|s| Some(s.as_str()))
.collect();

let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;

// Configure test to use V2_2 and verify encoding
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
assert_eq!(cols.len(), 1);
let col = &cols[0];

// Navigate to the dictionary encoding in the page layout
if let Some(PageEncoding::Structural(page_layout)) = &col.final_pages.first().map(|p| &p.description) {
// Check that dictionary is wrapped with general compression
if let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) = &page_layout.layout {
if let Some(dictionary_encoding) = &mini_block.dictionary {
match dictionary_encoding.compression.as_ref() {
Some(Compression::General(general)) => {
// Verify it's using LZ4 or Zstd
let compression = general.compression.as_ref().unwrap();
assert!(
compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmLz4
|| compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmZstd,
"Expected LZ4 or Zstd compression for large dictionary"
);
}
_ => panic!("Expected General compression for large dictionary"),
}
}
}
}
}));

check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
}
}
36 changes: 36 additions & 0 deletions rust/lance-encoding/src/encodings/physical/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use snafu::location;

use std::str::FromStr;

use crate::compression::{BlockCompressor, BlockDecompressor};
use crate::encodings::physical::binary::{BinaryBlockDecompressor, VariableEncoder};
use crate::format::pb21::{self, CompressiveEncoding};
use crate::format::ProtobufUtils21;
use crate::{
Expand Down Expand Up @@ -544,6 +546,40 @@ impl VariablePerValueDecompressor for CompressedBufferEncoder {
}
}

impl BlockCompressor for CompressedBufferEncoder {
fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
let encoded = match data {
DataBlock::FixedWidth(fixed_width) => fixed_width.data,
DataBlock::VariableWidth(variable_width) => {
// Wrap VariableEncoder to handle the encoding
let encoder = VariableEncoder::default();
BlockCompressor::compress(&encoder, DataBlock::VariableWidth(variable_width))?
}
_ => {
return Err(Error::InvalidInput {
source: "Unsupported data block type".into(),
location: location!(),
})
}
};

let mut compressed = Vec::new();
self.compressor.compress(&encoded, &mut compressed)?;
Ok(LanceBuffer::from(compressed))
}
}

impl BlockDecompressor for CompressedBufferEncoder {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
let mut decompressed = Vec::new();
self.compressor.decompress(&data, &mut decompressed)?;

// Delegate to BinaryBlockDecompressor which handles the inline metadata
let inner_decoder = BinaryBlockDecompressor::default();
inner_decoder.decompress(LanceBuffer::from(decompressed), num_values)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading