diff --git a/protos/encodings_v2_1.proto b/protos/encodings_v2_1.proto index f2efc9c44cd..2e79ed40216 100644 --- a/protos/encodings_v2_1.proto +++ b/protos/encodings_v2_1.proto @@ -112,6 +112,9 @@ message MiniBlockLayout { // The page already records how many rows are in the page. For mini-block we also need to know how // many "items" are in the page. A row and an item are the same thing unless the page has lists. uint64 num_items = 9; + + // Since Lance 2.2, miniblocks have larger chunk sizes (>= 64KB) + bool has_large_chunk = 10; } // A layout used for pages where the data is large diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index 9e2e9dd61ba..16d3faa9d97 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -45,13 +45,6 @@ const PRIMITIVE_TYPES: &[DataType] = &[ // schema doesn't yet parse them in the context of a fixed size list. const PRIMITIVE_TYPES_FOR_FSL: &[DataType] = &[DataType::Int8, DataType::Float32]; -const ENCODING_OPTIONS: EncodingOptions = EncodingOptions { - cache_bytes_per_column: 8 * 1024 * 1024, - max_page_bytes: 32 * 1024 * 1024, - keep_original_array: true, - buffer_alignment: 64, -}; - fn bench_decode(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let mut group = c.benchmark_group("decode_primitive"); @@ -73,7 +66,7 @@ fn bench_decode(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &EncodingOptions::default(), )) .unwrap(); @@ -138,7 +131,7 @@ fn bench_decode_fsl(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &EncodingOptions::default(), )) .unwrap(); b.iter(|| { @@ -204,7 +197,7 @@ fn bench_decode_str_with_dict_encoding(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &EncodingOptions::default(), )) .unwrap(); b.iter(|| { @@ -279,7 +272,7 @@ fn bench_decode_packed_struct(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &EncodingOptions::default(), )) .unwrap(); @@ -336,7 +329,7 @@ fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &EncodingOptions::default(), )) .unwrap(); b.iter(|| { diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index 1e7ca8a442d..bf106d9aa30 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -307,7 +307,7 @@ impl DefaultCompressionStrategy { } /// Parse compression parameters from field metadata - fn parse_field_metadata(field: &Field) -> CompressionFieldParams { + fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams { let mut params = CompressionFieldParams::default(); // Parse compression method @@ -335,6 +335,27 @@ impl DefaultCompressionStrategy { } } + // Parse minichunk size + if let Some(minichunk_size_str) = field + .metadata + .get(super::constants::MINICHUNK_SIZE_META_KEY) + { + if let Ok(minichunk_size) = minichunk_size_str.parse::() { + // for lance v2.1, only 32kb or smaller is supported + if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 { + log::warn!( + "minichunk_size '{}' too large for version '{}', using default", + minichunk_size, + version + ); + } else { + params.minichunk_size = Some(minichunk_size); + } + } else { + log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str); + } + } + params } @@ -377,12 +398,12 @@ impl DefaultCompressionStrategy { // 1. Check for explicit "none" compression if params.compression.as_deref() == Some("none") { - return Ok(Box::new(BinaryMiniBlockEncoder::default())); + return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))); } // 2. Check for explicit "fsst" compression if params.compression.as_deref() == Some("fsst") { - return Ok(Box::new(FsstMiniBlockEncoder::default())); + return Ok(Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))); } // 3. Choose base encoder (FSST or Binary) based on data characteristics @@ -390,9 +411,9 @@ impl DefaultCompressionStrategy { >= FSST_LEAST_INPUT_MAX_LENGTH && data_size >= FSST_LEAST_INPUT_SIZE as u64 { - Box::new(FsstMiniBlockEncoder::default()) + Box::new(FsstMiniBlockEncoder::new(params.minichunk_size)) } else { - Box::new(BinaryMiniBlockEncoder::default()) + Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)) }; // 4. Apply general compression if configured @@ -415,7 +436,7 @@ impl DefaultCompressionStrategy { .get_field_params(&field.name, &field.data_type()); // Override with field metadata if present (highest priority) - let metadata_params = Self::parse_field_metadata(field); + let metadata_params = Self::parse_field_metadata(field, &self.version); field_params.merge(&metadata_params); field_params @@ -1105,6 +1126,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE + minichunk_size: None, }, ); @@ -1136,6 +1158,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(3), bss: Some(BssMode::Off), // Disable BSS to test RLE + minichunk_size: None, }, ); @@ -1259,6 +1282,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(6), bss: None, + minichunk_size: None, }, ); @@ -1401,6 +1425,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: None, + minichunk_size: None, }, ); diff --git a/rust/lance-encoding/src/compression_config.rs b/rust/lance-encoding/src/compression_config.rs index d8364bc9fc2..4aee75b2104 100644 --- a/rust/lance-encoding/src/compression_config.rs +++ b/rust/lance-encoding/src/compression_config.rs @@ -67,6 +67,9 @@ pub struct CompressionFieldParams { /// Byte stream split mode for floating point data pub bss: Option, + + /// Minichunk size threshold for encoding + pub minichunk_size: Option, } impl CompressionParams { @@ -131,6 +134,9 @@ impl CompressionFieldParams { if other.bss.is_some() { self.bss = other.bss; } + if other.minichunk_size.is_some() { + self.minichunk_size = other.minichunk_size; + } } } @@ -197,6 +203,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: Some(BssMode::On), + minichunk_size: None, }; params.merge(&other); @@ -210,6 +217,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(3), bss: Some(BssMode::Auto), + minichunk_size: None, }; params.merge(&another); @@ -241,6 +249,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(3), bss: None, + minichunk_size: None, }, ); diff --git a/rust/lance-encoding/src/constants.rs b/rust/lance-encoding/src/constants.rs index fc467e2be63..173b1a1c085 100644 --- a/rust/lance-encoding/src/constants.rs +++ b/rust/lance-encoding/src/constants.rs @@ -13,6 +13,8 @@ pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression"; pub const COMPRESSION_LEVEL_META_KEY: &str = "lance-encoding:compression-level"; /// Metadata key for specifying RLE (Run-Length Encoding) threshold pub const RLE_THRESHOLD_META_KEY: &str = "lance-encoding:rle-threshold"; +/// Metadata key for specifying minichunk size +pub const MINICHUNK_SIZE_META_KEY: &str = "lance-encoding:minichunk-size"; // Dictionary encoding metadata keys /// Metadata key for specifying dictionary encoding threshold divisor diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index ee9fc36697d..230e591501f 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -234,6 +234,9 @@ pub struct EncodingOptions { /// The encoder needs to know this so it figures the position of out-of-line /// buffers correctly pub buffer_alignment: u64, + + /// The Lance file version being written + pub version: LanceFileVersion, } impl Default for EncodingOptions { @@ -243,10 +246,20 @@ impl Default for EncodingOptions { max_page_bytes: 32 * 1024 * 1024, keep_original_array: true, buffer_alignment: 64, + version: LanceFileVersion::default(), } } } +impl EncodingOptions { + /// If true (for Lance file version 2.2+), miniblock chunk sizes are u32, + /// to allow storing larger chunks and their sizes for better compression. + /// For Lance file version 2.1, miniblock chunk sizes are u16. + pub fn support_large_chunk(&self) -> bool { + self.version >= LanceFileVersion::V2_2 + } +} + /// A trait to pick which kind of field encoding to use for a field /// /// Unlike the ArrayEncodingStrategy, the field encoding strategy is @@ -758,6 +771,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: None, + minichunk_size: None, }, ); diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 35d5c7c40ad..b83be5f520a 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -26,6 +26,7 @@ use crate::{ use arrow_array::{cast::AsArray, make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray}; use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer}; use arrow_schema::{DataType, Field as ArrowField}; +use bytes::Bytes; use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt}; use itertools::Itertools; use lance_arrow::deepcopy::deep_copy_nulls; @@ -154,6 +155,7 @@ struct DecodeMiniBlockTask { num_buffers: u64, max_visible_level: u16, instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>, + has_large_chunk: bool, } impl DecodeMiniBlockTask { @@ -425,6 +427,28 @@ impl DecodeMiniBlockTask { } } + // read `num_buffers` buffer sizes from `buf` starting at `offset` + fn read_buffer_sizes( + buf: &[u8], + offset: &mut usize, + num_buffers: u64, + ) -> Vec { + let read_size = if LARGE { 4 } else { 2 }; + (0..num_buffers) + .map(|_| { + let bytes = &buf[*offset..*offset + read_size]; + let size = if LARGE { + u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) + } else { + // the buffer size is read from u16 but is stored as u32 after decoding for consistency + u16::from_le_bytes([bytes[0], bytes[1]]) as u32 + }; + *offset += read_size; + size + }) + .collect() + } + // Unserialize a miniblock into a collection of vectors fn decode_miniblock_chunk( &self, @@ -449,13 +473,12 @@ impl DecodeMiniBlockTask { } else { None }; - let buffer_sizes = (0..self.num_buffers) - .map(|_| { - let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]); - offset += 2; - size - }) - .collect::>(); + + let buffer_sizes = if self.has_large_chunk { + Self::read_buffer_sizes::(buf, &mut offset, self.num_buffers) + } else { + Self::read_buffer_sizes::(buf, &mut offset, self.num_buffers) + }; offset += pad_bytes::(offset); @@ -664,6 +687,7 @@ struct MiniBlockDecoder { num_rows: u64, num_buffers: u64, dictionary: Option>, + has_large_chunk: bool, } /// See [`MiniBlockScheduler`] for more details on the scheduling and decoding @@ -711,6 +735,7 @@ impl StructuralPageDecoder for MiniBlockDecoder { def_meaning: self.def_meaning.clone(), num_buffers: self.num_buffers, max_visible_level, + has_large_chunk: self.has_large_chunk, })) } @@ -1212,6 +1237,7 @@ pub struct MiniBlockScheduler { dictionary: Option, // This is set after initialization page_meta: Option>, + has_large_chunk: bool, } impl MiniBlockScheduler { @@ -1297,6 +1323,7 @@ impl MiniBlockScheduler { dictionary, def_meaning: def_meaning.into(), page_meta: None, + has_large_chunk: layout.has_large_chunk, }) } @@ -1622,6 +1649,54 @@ impl ChunkInstructions { } } +enum Words { + U16(ScalarBuffer), + U32(ScalarBuffer), +} + +struct WordsIter<'a> { + iter: Box + 'a>, +} + +impl Words { + pub fn len(&self) -> usize { + match self { + Self::U16(b) => b.len(), + Self::U32(b) => b.len(), + } + } + + pub fn iter(&self) -> WordsIter<'_> { + match self { + Self::U16(buf) => WordsIter { + iter: Box::new(buf.iter().map(|&x| x as u32)), + }, + Self::U32(buf) => WordsIter { + iter: Box::new(buf.iter().copied()), + }, + } + } + + pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result { + let bytes_per_value = if has_large_chunk { 4 } else { 2 }; + assert_eq!(bytes.len() % bytes_per_value, 0); + let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64); + if has_large_chunk { + Ok(Self::U32(buffer.borrow_to_typed_slice::())) + } else { + Ok(Self::U16(buffer.borrow_to_typed_slice::())) + } + } +} + +impl<'a> Iterator for WordsIter<'a> { + type Item = u32; + + fn next(&mut self) -> Option { + self.iter.next() + } +} + impl StructuralPageScheduler for MiniBlockScheduler { fn initialize<'a>( &'a mut self, @@ -1661,11 +1736,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { let rep_index_bytes = buffers.next(); // Parse the metadata and build the chunk meta - assert!(meta_bytes.len() % 2 == 0); - let bytes = LanceBuffer::from_bytes(meta_bytes, 2); - let words = bytes.borrow_to_typed_slice::(); - let words = words.as_ref(); - + let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?; let mut chunk_meta = Vec::with_capacity(words.len()); let mut rows_counter = 0; @@ -1775,6 +1846,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { let def_decompressor = self.def_decompressor.clone(); let value_decompressor = self.value_decompressor.clone(); let num_buffers = self.num_buffers; + let has_large_chunk = self.has_large_chunk; let dictionary = page_meta .dictionary .as_ref() @@ -1798,6 +1870,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { dictionary, num_rows, num_buffers, + has_large_chunk, }) as Box) } .boxed(); @@ -3339,6 +3412,7 @@ pub struct PrimitiveStructuralEncoder { accumulation_queue: AccumulationQueue, keep_original_array: bool, + support_large_chunk: bool, accumulated_repdefs: Vec, // The compression strategy we will use to compress the data compression_strategy: Arc, @@ -3378,6 +3452,7 @@ impl PrimitiveStructuralEncoder { column_index, options.keep_original_array, ), + support_large_chunk: options.support_large_chunk(), keep_original_array: options.keep_original_array, accumulated_repdefs: Vec::new(), column_index, @@ -3481,6 +3556,7 @@ impl PrimitiveStructuralEncoder { miniblocks: MiniBlockCompressed, rep: Option>, def: Option>, + support_large_chunk: bool, ) -> SerializedMiniBlockPage { let bytes_rep = rep .as_ref() @@ -3501,7 +3577,8 @@ impl PrimitiveStructuralEncoder { // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer let max_extra = 9 * num_buffers; let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra); - let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2); + let chunk_size_bytes = if support_large_chunk { 4 } else { 2 }; + let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes); let mut rep_iter = rep.map(|r| r.into_iter()); let mut def_iter = def.map(|d| d.into_iter()); @@ -3532,9 +3609,14 @@ impl PrimitiveStructuralEncoder { data_buffer.extend_from_slice(&bytes_def.to_le_bytes()); } - for buffer_size in &chunk.buffer_sizes { - let bytes = *buffer_size; - data_buffer.extend_from_slice(&bytes.to_le_bytes()); + if support_large_chunk { + for &buffer_size in &chunk.buffer_sizes { + data_buffer.extend_from_slice(&buffer_size.to_le_bytes()); + } + } else { + for &buffer_size in &chunk.buffer_sizes { + data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes()); + } } // Pad @@ -3566,17 +3648,28 @@ impl PrimitiveStructuralEncoder { } let chunk_bytes = data_buffer.len() - start_pos; - assert!(chunk_bytes <= 32 * 1024); + let max_chunk_size = if support_large_chunk { + 4 * 1024 * 1024 * 1024 // 4GB limit with u32 metadata + } else { + 32 * 1024 // 32KiB limit with u16 metadata + }; + assert!(chunk_bytes <= max_chunk_size); assert!(chunk_bytes > 0); assert_eq!(chunk_bytes % 8, 0); + // 4Ki values max + assert!(chunk.log_num_values <= 12); // We subtract 1 here from chunk_bytes because we want to be able to express // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with // 0xFFF let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT; let divided_bytes_minus_one = (divided_bytes - 1) as u64; - let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16; - meta_buffer.extend_from_slice(&metadata.to_le_bytes()); + let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64; + if support_large_chunk { + meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes()); + } else { + meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes()); + } } let data_buffer = LanceBuffer::from(data_buffer); @@ -3771,6 +3864,7 @@ impl PrimitiveStructuralEncoder { row_number: u64, dictionary_data: Option, num_rows: u64, + support_large_chunk: bool, ) -> Result { let repdef = RepDefBuilder::serialize(repdefs); @@ -3831,7 +3925,8 @@ impl PrimitiveStructuralEncoder { .as_mut() .map(|cd| std::mem::take(&mut cd.data)); - let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data); + let serialized = + Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk); // Metadata, Data, Dictionary, (maybe) Repetition Index let mut data = Vec::with_capacity(4); @@ -3861,6 +3956,7 @@ impl PrimitiveStructuralEncoder { Some((dictionary_encoding, num_dictionary_items)), &repdef.def_meaning, num_items, + support_large_chunk, ); Ok(EncodedPage { num_rows, @@ -3879,6 +3975,7 @@ impl PrimitiveStructuralEncoder { None, &repdef.def_meaning, num_items, + support_large_chunk, ); if let Some(rep_index) = rep_index { @@ -4294,6 +4391,7 @@ impl PrimitiveStructuralEncoder { let compression_strategy = self.compression_strategy.clone(); let field = self.field.clone(); let encoding_metadata = self.encoding_metadata.clone(); + let support_large_chunk = self.support_large_chunk; let task = spawn_cpu(move || { let num_values = arrays.iter().map(|arr| arr.len() as u64).sum(); @@ -4384,7 +4482,8 @@ impl PrimitiveStructuralEncoder { repdefs, row_number, Some(dictionary_data_block), - num_rows + num_rows, + support_large_chunk, ) } else if Self::should_dictionary_encode(&data_block, &field) { log::debug!( @@ -4403,6 +4502,7 @@ impl PrimitiveStructuralEncoder { row_number, Some(dictionary_data_block), num_rows, + support_large_chunk, ) } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) { log::debug!( @@ -4419,6 +4519,7 @@ impl PrimitiveStructuralEncoder { row_number, None, num_rows, + support_large_chunk, ) } else if Self::prefers_fullzip(encoding_metadata.as_ref()) { log::debug!( @@ -5523,6 +5624,67 @@ mod tests { check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await } + async fn test_minichunk_size_helper( + string_data: Vec>, + minichunk_size: u64, + file_version: LanceFileVersion, + ) { + use crate::constants::MINICHUNK_SIZE_META_KEY; + use crate::testing::{check_round_trip_encoding_of_data, TestCases}; + use arrow_array::{ArrayRef, StringArray}; + use std::sync::Arc; + + let string_array: ArrayRef = Arc::new(StringArray::from(string_data)); + + let mut metadata = HashMap::new(); + metadata.insert( + MINICHUNK_SIZE_META_KEY.to_string(), + minichunk_size.to_string(), + ); + metadata.insert( + STRUCTURAL_ENCODING_META_KEY.to_string(), + STRUCTURAL_ENCODING_MINIBLOCK.to_string(), + ); + + let test_cases = TestCases::default() + .with_min_file_version(file_version) + .with_batch_size(1000); + + check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await; + } + + #[tokio::test] + async fn test_minichunk_size_roundtrip() { + // Test that minichunk size can be configured and works correctly in round-trip encoding + let mut string_data = Vec::new(); + for i in 0..100 { + string_data.push(Some(format!("test_string_{}", i).repeat(50))); + } + // configure minichunk size to 64 bytes (smaller than the default 4kb) for Lance 2.1 + test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await; + } + + #[tokio::test] + async fn test_minichunk_size_128kb_v2_2() { + // Test that minichunk size can be configured to 128KB and works correctly with Lance 2.2 + let mut string_data = Vec::new(); + // create a 500kb string array + for i in 0..10000 { + string_data.push(Some(format!("test_string_{}", i).repeat(50))); + } + test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await; + } + + #[tokio::test] + async fn test_binary_large_minichunk_size_over_max_miniblock_values() { + let mut string_data = Vec::new(); + // 128kb/chunk / 6 bytes (t_9999) = 21845 > max 4096 items per chunk + for i in 0..10000 { + string_data.push(Some(format!("t_{}", i))); + } + test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await; + } + #[tokio::test] async fn test_large_dictionary_general_compression() { use arrow_array::{ArrayRef, StringArray}; diff --git a/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs b/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs index 408761b08c3..6da985e9ec0 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs @@ -44,8 +44,9 @@ pub struct MiniBlockCompressed { pub struct MiniBlockChunk { // The size in bytes of each buffer in the chunk. // - // The total size must be less than or equal to 8Ki - 6 (8188) - pub buffer_sizes: Vec, + // In Lance 2.1, the chunk size is limited to 32KiB, so only 16-bits are used. + // Since Lance 2.2, the chunk size uses u32 to support larger chunk size + pub buffer_sizes: Vec, // The log (base 2) of the number of values in the chunk. If this is the final chunk // then this should be 0 (the number of values will be calculated by subtracting the // size of all other chunks from the total size of the page) diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 36fe92e9c9d..7d3a2b34693 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -22,7 +22,7 @@ use crate::buffer::LanceBuffer; use crate::data::{BlockInfo, DataBlock, VariableWidthBlock}; use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock}; use crate::encodings::logical::primitive::miniblock::{ - MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, + MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_VALUES, }; use crate::format::pb21::compressive_encoding::Compression; use crate::format::pb21::CompressiveEncoding; @@ -31,16 +31,34 @@ use crate::format::{pb21, ProtobufUtils21}; use lance_core::utils::bit::pad_bytes_to; use lance_core::{Error, Result}; -#[derive(Debug, Default)] -pub struct BinaryMiniBlockEncoder {} +#[derive(Debug)] +pub struct BinaryMiniBlockEncoder { + minichunk_size: i64, +} + +impl Default for BinaryMiniBlockEncoder { + fn default() -> Self { + Self { + minichunk_size: *AIM_MINICHUNK_SIZE, + } + } +} -const AIM_MINICHUNK_SIZE: i64 = 4 * 1024; +const DEFAULT_AIM_MINICHUNK_SIZE: i64 = 4 * 1024; + +pub static AIM_MINICHUNK_SIZE: std::sync::LazyLock = std::sync::LazyLock::new(|| { + std::env::var("LANCE_BINARY_MINIBLOCK_CHUNK_SIZE") + .unwrap_or_else(|_| DEFAULT_AIM_MINICHUNK_SIZE.to_string()) + .parse::() + .unwrap_or(DEFAULT_AIM_MINICHUNK_SIZE) +}); // Make it to support both u32 and u64 fn chunk_offsets( offsets: &[N], data: &[u8], alignment: usize, + minichunk_size: i64, ) -> (Vec, Vec) { #[derive(Debug)] struct ChunkInfo { @@ -60,7 +78,8 @@ fn chunk_offsets( let mut chunks = vec![]; let mut last_offset_in_orig_idx = 0; loop { - let this_last_offset_in_orig_idx = search_next_offset_idx(offsets, last_offset_in_orig_idx); + let this_last_offset_in_orig_idx = + search_next_offset_idx(offsets, last_offset_in_orig_idx, minichunk_size); let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx; let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx]; @@ -83,7 +102,7 @@ fn chunk_offsets( } else { num_values_in_this_chunk.trailing_zeros() as u8 }, - buffer_sizes: vec![padded_chunk_size as u16], + buffer_sizes: vec![padded_chunk_size as u32], }); if this_last_offset_in_orig_idx == offsets.len() - 1 { break; @@ -135,7 +154,11 @@ fn chunk_offsets( // this function incrementally peek the number of values in a chunk, // each time multiplies the number of values by 2. // It returns the offset_idx in `offsets` that belongs to this chunk. -fn search_next_offset_idx(offsets: &[N], last_offset_idx: usize) -> usize { +fn search_next_offset_idx( + offsets: &[N], + last_offset_idx: usize, + minichunk_size: i64, +) -> usize { let mut num_values = 1; let mut new_num_values = num_values * 2; loop { @@ -144,7 +167,7 @@ fn search_next_offset_idx(offsets: &[N], last_offset_idx: us // existing bytes plus the new offset size let new_size = existing_bytes + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap(); - if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE { + if new_size.to_i64().unwrap() <= minichunk_size { // case 1: can fit the rest of all data into a miniblock return offsets.len() - 1; } else { @@ -155,18 +178,28 @@ fn search_next_offset_idx(offsets: &[N], last_offset_idx: us let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx]; let new_size = existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap(); - if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE { + if new_size.to_i64().unwrap() <= minichunk_size { + if new_num_values * 2 > MAX_MINIBLOCK_VALUES as usize { + // hit the max number of values limit + break; + } num_values = new_num_values; new_num_values *= 2; } else { break; } } - last_offset_idx + new_num_values + last_offset_idx + num_values } impl BinaryMiniBlockEncoder { - // put binary data into chunks, every chunk is less than or equal to `AIM_MINICHUNK_SIZE`. + pub fn new(minichunk_size: Option) -> Self { + Self { + minichunk_size: minichunk_size.unwrap_or(*AIM_MINICHUNK_SIZE), + } + } + + // put binary data into chunks, every chunk is less than or equal to `minichunk_size`. // In each chunk, offsets are put first then followed by binary bytes data, each chunk is padded to 8 bytes. // the offsets in the chunk points to the bytes offset in this chunk. fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) { @@ -175,7 +208,8 @@ impl BinaryMiniBlockEncoder { match data.bits_per_offset { 32 => { let offsets = data.offsets.borrow_to_typed_slice::(); - let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 4); + let (buffers, chunks) = + chunk_offsets(offsets.as_ref(), &data.data, 4, self.minichunk_size); ( MiniBlockCompressed { data: buffers, @@ -187,7 +221,8 @@ impl BinaryMiniBlockEncoder { } 64 => { let offsets = data.offsets.borrow_to_typed_slice::(); - let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 8); + let (buffers, chunks) = + chunk_offsets(offsets.as_ref(), &data.data, 8, self.minichunk_size); ( MiniBlockCompressed { data: buffers, diff --git a/rust/lance-encoding/src/encodings/physical/bitpacking.rs b/rust/lance-encoding/src/encodings/physical/bitpacking.rs index 3efa6662431..9e22e47d745 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpacking.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpacking.rs @@ -120,7 +120,7 @@ impl InlineBitpacking { ); } chunks.push(MiniBlockChunk { - buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::()) as u16], + buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::()) as u32], log_num_values: LOG_ELEMS_PER_CHUNK, }); } @@ -149,7 +149,7 @@ impl InlineBitpacking { chunks.push(MiniBlockChunk { buffer_sizes: vec![ ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::()) - as u16, + as u32, ], log_num_values: 0, }); diff --git a/rust/lance-encoding/src/encodings/physical/byte_stream_split.rs b/rust/lance-encoding/src/encodings/physical/byte_stream_split.rs index 627317a1a9c..6b442428389 100644 --- a/rust/lance-encoding/src/encodings/physical/byte_stream_split.rs +++ b/rust/lance-encoding/src/encodings/physical/byte_stream_split.rs @@ -159,7 +159,7 @@ impl MiniBlockCompressor for ByteStreamSplitEncoder { debug_assert!(chunk_bytes > 0); chunks.push(MiniBlockChunk { - buffer_sizes: vec![chunk_bytes as u16], + buffer_sizes: vec![chunk_bytes as u32], log_num_values, }); diff --git a/rust/lance-encoding/src/encodings/physical/fsst.rs b/rust/lance-encoding/src/encodings/physical/fsst.rs index c74a3093b0f..25dac9b7896 100644 --- a/rust/lance-encoding/src/encodings/physical/fsst.rs +++ b/rust/lance-encoding/src/encodings/physical/fsst.rs @@ -129,7 +129,15 @@ impl FsstCompressed { } #[derive(Debug, Default)] -pub struct FsstMiniBlockEncoder {} +pub struct FsstMiniBlockEncoder { + minichunk_size: Option, +} + +impl FsstMiniBlockEncoder { + pub fn new(minichunk_size: Option) -> Self { + Self { minichunk_size } + } +} impl MiniBlockCompressor for FsstMiniBlockEncoder { fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> { @@ -138,8 +146,8 @@ impl MiniBlockCompressor for FsstMiniBlockEncoder { let data_block = DataBlock::VariableWidth(compressed.data); // compress the fsst compressed data using `BinaryMiniBlockEncoder` - let binary_compressor = - Box::new(BinaryMiniBlockEncoder::default()) as Box; + let binary_compressor = Box::new(BinaryMiniBlockEncoder::new(self.minichunk_size)) + as Box; let (binary_miniblock_compressed, binary_array_encoding) = binary_compressor.compress(data_block)?; @@ -367,7 +375,6 @@ impl MiniBlockDecompressor for FsstMiniBlockDecompressor { #[cfg(test)] mod tests { - use std::collections::HashMap; use lance_datagen::{ByteCount, RowCount}; diff --git a/rust/lance-encoding/src/encodings/physical/general.rs b/rust/lance-encoding/src/encodings/physical/general.rs index eb5ff12e62a..e824c8aacfd 100644 --- a/rust/lance-encoding/src/encodings/physical/general.rs +++ b/rust/lance-encoding/src/encodings/physical/general.rs @@ -68,7 +68,7 @@ impl MiniBlockCompressor for GeneralMiniBlockCompressor { // Create new chunk with updated first buffer size let mut new_buffer_sizes = chunk.buffer_sizes.clone(); - new_buffer_sizes[0] = compressed_size as u16; + new_buffer_sizes[0] = compressed_size as u32; new_chunks.push(MiniBlockChunk { buffer_sizes: new_buffer_sizes, diff --git a/rust/lance-encoding/src/encodings/physical/rle.rs b/rust/lance-encoding/src/encodings/physical/rle.rs index 57c7765d8f1..758e3307521 100644 --- a/rust/lance-encoding/src/encodings/physical/rle.rs +++ b/rust/lance-encoding/src/encodings/physical/rle.rs @@ -149,7 +149,7 @@ impl RleMiniBlockEncoder { let lengths_size = all_lengths.len() - lengths_start; let chunk = MiniBlockChunk { - buffer_sizes: vec![values_size as u16, lengths_size as u16], + buffer_sizes: vec![values_size as u32, lengths_size as u32], log_num_values, }; diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index d17275b9a4b..6c8e6dbd787 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -65,7 +65,7 @@ impl ValueEncoder { let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize); debug_assert_eq!(vals_per_chunk % values_per_word, 0); let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word); - let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap(); + let bytes_per_chunk = u32::try_from(bytes_per_chunk).unwrap(); debug_assert!(bytes_per_chunk > 0); let data_buffer = data.data; @@ -86,7 +86,7 @@ impl ValueEncoder { } else if row_offset < data.num_values { // Final chunk, special values let num_bytes = data_buffer.len() as u64 - bytes_counter; - let num_bytes = u16::try_from(num_bytes).unwrap(); + let num_bytes = u32::try_from(num_bytes).unwrap(); chunks.push(MiniBlockChunk { log_num_values: 0, buffer_sizes: vec![num_bytes], @@ -147,7 +147,7 @@ impl ValueEncoder { row_offset: usize, num_rows: usize, validity_buffers: &mut [Vec], - ) -> Vec { + ) -> Vec { let mut row_offset = row_offset; let mut num_values = num_rows; let mut buffer_counter = 0; @@ -160,14 +160,14 @@ impl ValueEncoder { .clone() .bit_slice_le_with_length(row_offset, num_values); validity_buffers[buffer_counter].extend_from_slice(&validity_slice); - buffer_sizes.push(validity_slice.len() as u16); + buffer_sizes.push(validity_slice.len() as u32); buffer_counter += 1; } } let bits_in_chunk = data.bits_per_value * num_values as u64; let bytes_in_chunk = bits_in_chunk.div_ceil(8); - let bytes_in_chunk = u16::try_from(bytes_in_chunk).unwrap(); + let bytes_in_chunk = u32::try_from(bytes_in_chunk).unwrap(); debug_assert!(bytes_in_chunk > 0); buffer_sizes.push(bytes_in_chunk); diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 4ef3719e7e2..30b1220826e 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -541,7 +541,8 @@ macro_rules! impl_common_protobuf_utils { )>, def_meaning: &[DefinitionInterpretation], num_items: u64, - ) -> crate::format::$module::PageLayout { + has_large_chunk: bool, + ) -> crate::format::$module::PageLayout { assert!(!def_meaning.is_empty()); let (dictionary, num_dictionary_items) = dictionary_encoding .map(|(d, i)| (Some(d), i)) @@ -562,7 +563,8 @@ macro_rules! impl_common_protobuf_utils { .map(|&def| Self::def_inter_to_repdef_layer(def)) .collect(), num_items, - }, + has_large_chunk, + }, ), ), } diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index f6bc8cda268..9161bcef99f 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -348,6 +348,7 @@ pub async fn check_round_trip_encoding_generated( cache_bytes_per_column: page_size, keep_original_array: true, buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT, + version, }; encoding_strategy .create_field_encoder( @@ -736,6 +737,7 @@ pub async fn check_round_trip_encoding_of_data_with_expected( max_page_bytes: test_cases.get_max_page_size(), keep_original_array: true, buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT, + version: file_version, }; let encoder = encoding_strategy .create_field_encoder( diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index 62ae0bfb06c..354e0f920d0 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -1720,6 +1720,7 @@ pub mod tests { max_page_bytes: 32 * 1024 * 1024, keep_original_array: true, buffer_alignment: 64, + version, }; let encoding_strategy = default_encoding_strategy(version); diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index 22dfec02b5f..4952d9476c4 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -319,6 +319,7 @@ impl FileWriter { max_page_bytes, keep_original_array, buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64, + version: self.version(), }; let encoder = BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?; @@ -1052,6 +1053,7 @@ mod tests { compression: None, // Will use default compression if any compression_level: None, bss: Some(lance_encoding::compression_config::BssMode::Off), // Explicitly disable BSS to ensure RLE is used + minichunk_size: None, }, );