From f7892187f25376efb22321611a9e89255f6a0abf Mon Sep 17 00:00:00 2001 From: lyang24 Date: Sat, 4 Oct 2025 23:48:48 -0700 Subject: [PATCH] feat: support general compression zstd/lz4 in blocks Signed-off-by: lyang24 --- rust/lance-encoding/src/compression.rs | 102 +++++++++++++++--- .../src/encodings/logical/primitive.rs | 57 ++++++++++ .../src/encodings/physical/block.rs | 36 +++++++ 3 files changed, 179 insertions(+), 16 deletions(-) diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index 09107d312ac..36afb6cab7e 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -54,6 +54,7 @@ use crate::{ ProtobufUtils21, }, statistics::{GetStat, Stat}, + version::LanceFileVersion, }; use arrow_array::{cast::AsArray, types::UInt64Type}; @@ -66,6 +67,9 @@ use std::str::FromStr; /// RLE is chosen when the run count is less than this fraction of total values. const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5; +// Minimum block size (32kb) to trigger general block compression +const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024; + /// Trait for compression algorithms that compress an entire block of data into one opaque /// and self-described chunk. /// @@ -125,6 +129,8 @@ pub trait CompressionStrategy: Send + Sync + std::fmt::Debug { pub struct DefaultCompressionStrategy { /// User-configured compression parameters params: CompressionParams, + /// The lance file version for compatibilities. + version: LanceFileVersion, } fn try_bss_for_mini_block( @@ -246,6 +252,34 @@ fn maybe_wrap_general_for_mini_block( } } +fn try_general_compression( + version: LanceFileVersion, + field_params: &CompressionFieldParams, + data: &DataBlock, +) -> Result, CompressionConfig)>> { + // User-requested compression (unused today but perhaps still used + // in the future someday) + if let Some(compression_scheme) = &field_params.compression { + if compression_scheme != "none" && version >= LanceFileVersion::V2_2 { + let scheme: CompressionScheme = compression_scheme.parse()?; + let config = CompressionConfig::new(scheme, field_params.compression_level); + let compressor = Box::new(CompressedBufferEncoder::try_new(config)?); + return Ok(Some((compressor, config))); + } + } + + // Automatic compression for large blocks + if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION + && version >= LanceFileVersion::V2_2 + { + let compressor = Box::new(CompressedBufferEncoder::default()); + let config = compressor.compressor.config(); + return Ok(Some((compressor, config))); + } + + Ok(None) +} + impl DefaultCompressionStrategy { /// Create a new compression strategy with default behavior pub fn new() -> Self { @@ -254,7 +288,10 @@ impl DefaultCompressionStrategy { /// Create a new compression strategy with user-configured parameters pub fn with_params(params: CompressionParams) -> Self { - Self { params } + Self { + params, + version: LanceFileVersion::default(), + } } /// Parse compression parameters from field metadata @@ -357,14 +394,10 @@ impl DefaultCompressionStrategy { Ok(base_encoder) } -} -impl CompressionStrategy for DefaultCompressionStrategy { - fn create_miniblock_compressor( - &self, - field: &Field, - data: &DataBlock, - ) -> Result> { + /// Merge user-configured parameters with field metadata + /// Field metadata has highest priority + fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams { let mut field_params = self .params .get_field_params(&field.name, &field.data_type()); @@ -373,6 +406,18 @@ impl CompressionStrategy for DefaultCompressionStrategy { let metadata_params = Self::parse_field_metadata(field); field_params.merge(&metadata_params); + field_params + } +} + +impl CompressionStrategy for DefaultCompressionStrategy { + fn create_miniblock_compressor( + &self, + field: &Field, + data: &DataBlock, + ) -> Result> { + let field_params = self.get_merged_field_params(field); + match data { DataBlock::FixedWidth(fixed_width_data) => { self.build_fixed_width_compressor(&field_params, fixed_width_data) @@ -475,20 +520,44 @@ impl CompressionStrategy for DefaultCompressionStrategy { field: &Field, data: &DataBlock, ) -> Result<(Box, CompressiveEncoding)> { + let field_params = self.get_merged_field_params(field); + match data { - // Currently, block compression is used for rep/def (which is fixed width) and for dictionary - // encoding (which could be fixed width or variable width). DataBlock::FixedWidth(fixed_width) => { if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) { return Ok((compressor, encoding)); } - // Default to uncompressed + // Try general compression (user-requested or automatic over MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION) + if let Some((compressor, config)) = + try_general_compression(self.version, &field_params, data)? + { + let encoding = ProtobufUtils21::wrapped( + config, + ProtobufUtils21::flat(fixed_width.bits_per_value, None), + )?; + return Ok((compressor, encoding)); + } + let encoder = Box::new(ValueEncoder::default()); let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None); Ok((encoder, encoding)) } DataBlock::VariableWidth(variable_width) => { + // Try general compression + if let Some((compressor, config)) = + try_general_compression(self.version, &field_params, data)? + { + let encoding = ProtobufUtils21::wrapped( + config, + ProtobufUtils21::variable( + ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None), + None, + ), + )?; + return Ok((compressor, encoding)); + } + let encoder = Box::new(VariableEncoder::default()); let encoding = ProtobufUtils21::variable( ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None), @@ -496,11 +565,7 @@ impl CompressionStrategy for DefaultCompressionStrategy { ); Ok((encoder, encoding)) } - _ => todo!( - "block compressor for field {:?} and block type {:?}", - field, - data.name() - ), + _ => unreachable!(), } } } @@ -756,6 +821,11 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { out_of_line.uncompressed_bits_per_value, ))) } + Compression::General(general) => { + let compression = general.compression.as_ref().unwrap(); + let scheme = compression.scheme(); + Ok(Box::new(CompressedBufferEncoder::from_scheme(scheme)?)) + } _ => todo!(), } } diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 71272aea6d7..415dee0aba5 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -4401,9 +4401,12 @@ impl FieldEncoder for PrimitiveStructuralEncoder { #[allow(clippy::single_range_in_vec_init)] mod tests { use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK}; + use crate::decoder::PageEncoding; use crate::encodings::logical::primitive::{ ChunkDrainInstructions, PrimitiveStructuralEncoder, }; + use crate::format::pb21; + use crate::format::pb21::compressive_encoding::Compression; use crate::testing::{check_round_trip_encoding_of_data, TestCases}; use crate::version::LanceFileVersion; use arrow_array::{ArrayRef, Int8Array, StringArray}; @@ -5382,4 +5385,58 @@ mod tests { check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await } + + #[tokio::test] + async fn test_large_dictionary_general_compression() { + use arrow_array::{ArrayRef, StringArray}; + use std::collections::HashMap; + use std::sync::Arc; + + // Create large string dictionary data (>32KiB) with low cardinality + // Use 100 unique strings, each 500 bytes long = 50KB dictionary + let unique_values: Vec = (0..100) + .map(|i| format!("value_{:04}_{}", i, "x".repeat(500))) + .collect(); + + // Repeat these strings many times to create a large array + let repeated_strings: Vec<_> = unique_values + .iter() + .cycle() + .take(100_000) + .map(|s| Some(s.as_str())) + .collect(); + + let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef; + + // Configure test to use V2_2 and verify encoding + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| { + assert_eq!(cols.len(), 1); + let col = &cols[0]; + + // Navigate to the dictionary encoding in the page layout + if let Some(PageEncoding::Structural(page_layout)) = &col.final_pages.first().map(|p| &p.description) { + // Check that dictionary is wrapped with general compression + if let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) = &page_layout.layout { + if let Some(dictionary_encoding) = &mini_block.dictionary { + match dictionary_encoding.compression.as_ref() { + Some(Compression::General(general)) => { + // Verify it's using LZ4 or Zstd + let compression = general.compression.as_ref().unwrap(); + assert!( + compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmLz4 + || compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmZstd, + "Expected LZ4 or Zstd compression for large dictionary" + ); + } + _ => panic!("Expected General compression for large dictionary"), + } + } + } + } + })); + + check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await; + } } diff --git a/rust/lance-encoding/src/encodings/physical/block.rs b/rust/lance-encoding/src/encodings/physical/block.rs index e57bec540d8..ec22e968428 100644 --- a/rust/lance-encoding/src/encodings/physical/block.rs +++ b/rust/lance-encoding/src/encodings/physical/block.rs @@ -27,6 +27,8 @@ use snafu::location; use std::str::FromStr; +use crate::compression::{BlockCompressor, BlockDecompressor}; +use crate::encodings::physical::binary::{BinaryBlockDecompressor, VariableEncoder}; use crate::format::pb21::{self, CompressiveEncoding}; use crate::format::ProtobufUtils21; use crate::{ @@ -544,6 +546,40 @@ impl VariablePerValueDecompressor for CompressedBufferEncoder { } } +impl BlockCompressor for CompressedBufferEncoder { + fn compress(&self, data: DataBlock) -> Result { + let encoded = match data { + DataBlock::FixedWidth(fixed_width) => fixed_width.data, + DataBlock::VariableWidth(variable_width) => { + // Wrap VariableEncoder to handle the encoding + let encoder = VariableEncoder::default(); + BlockCompressor::compress(&encoder, DataBlock::VariableWidth(variable_width))? + } + _ => { + return Err(Error::InvalidInput { + source: "Unsupported data block type".into(), + location: location!(), + }) + } + }; + + let mut compressed = Vec::new(); + self.compressor.compress(&encoded, &mut compressed)?; + Ok(LanceBuffer::from(compressed)) + } +} + +impl BlockDecompressor for CompressedBufferEncoder { + fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result { + let mut decompressed = Vec::new(); + self.compressor.decompress(&data, &mut decompressed)?; + + // Delegate to BinaryBlockDecompressor which handles the inline metadata + let inner_decoder = BinaryBlockDecompressor::default(); + inner_decoder.decompress(LanceBuffer::from(decompressed), num_values) + } +} + #[cfg(test)] mod tests { use super::*;