From bae23ef6e6aa6ebf2886396363cbc3e22b76c5cc Mon Sep 17 00:00:00 2001 From: Yue Ni Date: Wed, 24 Sep 2025 22:46:18 +0800 Subject: [PATCH 1/7] Make binary mini block chunk size configurable. --- .../lance-encoding/src/encodings/physical/binary.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index e989c0c1046..69c2f513d65 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -34,7 +34,14 @@ use lance_core::{Error, Result}; #[derive(Debug, Default)] pub struct BinaryMiniBlockEncoder {} -const AIM_MINICHUNK_SIZE: i64 = 4 * 1024; +const DEFAULT_AIM_MINICHUNK_SIZE: i64 = 4 * 1024; + +static AIM_MINICHUNK_SIZE: std::sync::LazyLock = std::sync::LazyLock::new(|| { + std::env::var("LANCE_MINIBLOCK_CHUNK_SIZE") + .unwrap_or_else(|_| DEFAULT_AIM_MINICHUNK_SIZE.to_string()) + .parse::() + .unwrap_or(DEFAULT_AIM_MINICHUNK_SIZE) +}); // Make it to support both u32 and u64 fn chunk_offsets( @@ -144,7 +151,7 @@ fn search_next_offset_idx(offsets: &[N], last_offset_idx: us // existing bytes plus the new offset size let new_size = existing_bytes + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap(); - if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE { + if new_size.to_i64().unwrap() <= *AIM_MINICHUNK_SIZE { // case 1: can fit the rest of all data into a miniblock return offsets.len() - 1; } else { @@ -155,7 +162,7 @@ fn search_next_offset_idx(offsets: &[N], last_offset_idx: us let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx]; let new_size = existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap(); - if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE { + if new_size.to_i64().unwrap() <= *AIM_MINICHUNK_SIZE { num_values = new_num_values; new_num_values *= 2; } else { From 4739d5cfb589af22e9516e07724f19d4c4481063 Mon Sep 17 00:00:00 2001 From: Yue Ni Date: Sun, 28 Sep 2025 14:38:46 +0800 Subject: [PATCH 2/7] Make 64kb binary atim minichunk size for binary to be possible. --- .../src/encodings/logical/primitive.rs | 13 +++++++------ .../lance-encoding/src/encodings/physical/binary.rs | 8 ++++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index a1131cc827a..c61637b3672 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -1661,9 +1661,9 @@ impl StructuralPageScheduler for MiniBlockScheduler { let rep_index_bytes = buffers.next(); // Parse the metadata and build the chunk meta - assert!(meta_bytes.len() % 2 == 0); - let bytes = LanceBuffer::from_bytes(meta_bytes, 2); - let words = bytes.borrow_to_typed_slice::(); + assert!(meta_bytes.len() % 4 == 0); + let bytes = LanceBuffer::from_bytes(meta_bytes, 4); + let words = bytes.borrow_to_typed_slice::(); let words = words.as_ref(); let mut chunk_meta = Vec::with_capacity(words.len()); @@ -3501,7 +3501,7 @@ impl PrimitiveStructuralEncoder { // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer let max_extra = 9 * num_buffers; let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra); - let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2); + let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 4); let mut rep_iter = rep.map(|r| r.into_iter()); let mut def_iter = def.map(|d| d.into_iter()); @@ -3566,16 +3566,17 @@ impl PrimitiveStructuralEncoder { } let chunk_bytes = data_buffer.len() - start_pos; - assert!(chunk_bytes <= 32 * 1024); + assert!(chunk_bytes <= 2 * 1024 * 1024 * 1024); // 2GB limit with u32 metadata assert!(chunk_bytes > 0); assert_eq!(chunk_bytes % 8, 0); + assert!(chunk.log_num_values <= 12); // 4Ki values max // We subtract 1 here from chunk_bytes because we want to be able to express // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with // 0xFFF let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT; let divided_bytes_minus_one = (divided_bytes - 1) as u64; - let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16; + let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u32; meta_buffer.extend_from_slice(&metadata.to_le_bytes()); } diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 69c2f513d65..caa7fe21de3 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -22,7 +22,7 @@ use crate::buffer::LanceBuffer; use crate::data::{BlockInfo, DataBlock, VariableWidthBlock}; use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock}; use crate::encodings::logical::primitive::miniblock::{ - MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, + MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_VALUES, }; use crate::format::pb21::compressive_encoding::Compression; use crate::format::pb21::CompressiveEncoding; @@ -163,13 +163,17 @@ fn search_next_offset_idx(offsets: &[N], last_offset_idx: us let new_size = existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap(); if new_size.to_i64().unwrap() <= *AIM_MINICHUNK_SIZE { + if new_num_values * 2 > MAX_MINIBLOCK_VALUES as usize { + // hit the max number of values limit + break; + } num_values = new_num_values; new_num_values *= 2; } else { break; } } - last_offset_idx + new_num_values + last_offset_idx + num_values } impl BinaryMiniBlockEncoder { From 2151dca74da287c40b0c9baef04b27de4126b9c3 Mon Sep 17 00:00:00 2001 From: Yue Ni Date: Sun, 28 Sep 2025 14:42:50 +0800 Subject: [PATCH 3/7] Change miniblock chunk's buffer sizes to be u32 so that it is possible to enlarge chunk size to be over 64kb --- .../src/encodings/logical/primitive.rs | 12 +++++++++--- .../src/encodings/logical/primitive/miniblock.rs | 4 ++-- rust/lance-encoding/src/encodings/physical/binary.rs | 2 +- .../src/encodings/physical/bitpacking.rs | 4 ++-- .../src/encodings/physical/byte_stream_split.rs | 2 +- .../lance-encoding/src/encodings/physical/general.rs | 2 +- rust/lance-encoding/src/encodings/physical/rle.rs | 2 +- rust/lance-encoding/src/encodings/physical/value.rs | 10 +++++----- 8 files changed, 22 insertions(+), 16 deletions(-) diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index c61637b3672..2e118dba40b 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -451,8 +451,13 @@ impl DecodeMiniBlockTask { }; let buffer_sizes = (0..self.num_buffers) .map(|_| { - let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]); - offset += 2; + let size = u32::from_le_bytes([ + buf[offset], + buf[offset + 1], + buf[offset + 2], + buf[offset + 3], + ]); + offset += 4; size }) .collect::>(); @@ -3569,7 +3574,8 @@ impl PrimitiveStructuralEncoder { assert!(chunk_bytes <= 2 * 1024 * 1024 * 1024); // 2GB limit with u32 metadata assert!(chunk_bytes > 0); assert_eq!(chunk_bytes % 8, 0); - assert!(chunk.log_num_values <= 12); // 4Ki values max + // 4Ki values max + assert!(chunk.log_num_values <= 12); // We subtract 1 here from chunk_bytes because we want to be able to express // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with // 0xFFF diff --git a/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs b/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs index 408761b08c3..2e420921021 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs @@ -44,8 +44,8 @@ pub struct MiniBlockCompressed { pub struct MiniBlockChunk { // The size in bytes of each buffer in the chunk. // - // The total size must be less than or equal to 8Ki - 6 (8188) - pub buffer_sizes: Vec, + // Updated to support larger chunk sizes (up to 4GB per buffer) + pub buffer_sizes: Vec, // The log (base 2) of the number of values in the chunk. If this is the final chunk // then this should be 0 (the number of values will be calculated by subtracting the // size of all other chunks from the total size of the page) diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index caa7fe21de3..39cdc3d9a71 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -90,7 +90,7 @@ fn chunk_offsets( } else { num_values_in_this_chunk.trailing_zeros() as u8 }, - buffer_sizes: vec![padded_chunk_size as u16], + buffer_sizes: vec![padded_chunk_size as u32], }); if this_last_offset_in_orig_idx == offsets.len() - 1 { break; diff --git a/rust/lance-encoding/src/encodings/physical/bitpacking.rs b/rust/lance-encoding/src/encodings/physical/bitpacking.rs index 3efa6662431..9e22e47d745 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpacking.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpacking.rs @@ -120,7 +120,7 @@ impl InlineBitpacking { ); } chunks.push(MiniBlockChunk { - buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::()) as u16], + buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::()) as u32], log_num_values: LOG_ELEMS_PER_CHUNK, }); } @@ -149,7 +149,7 @@ impl InlineBitpacking { chunks.push(MiniBlockChunk { buffer_sizes: vec![ ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::()) - as u16, + as u32, ], log_num_values: 0, }); diff --git a/rust/lance-encoding/src/encodings/physical/byte_stream_split.rs b/rust/lance-encoding/src/encodings/physical/byte_stream_split.rs index 627317a1a9c..6b442428389 100644 --- a/rust/lance-encoding/src/encodings/physical/byte_stream_split.rs +++ b/rust/lance-encoding/src/encodings/physical/byte_stream_split.rs @@ -159,7 +159,7 @@ impl MiniBlockCompressor for ByteStreamSplitEncoder { debug_assert!(chunk_bytes > 0); chunks.push(MiniBlockChunk { - buffer_sizes: vec![chunk_bytes as u16], + buffer_sizes: vec![chunk_bytes as u32], log_num_values, }); diff --git a/rust/lance-encoding/src/encodings/physical/general.rs b/rust/lance-encoding/src/encodings/physical/general.rs index eb5ff12e62a..e824c8aacfd 100644 --- a/rust/lance-encoding/src/encodings/physical/general.rs +++ b/rust/lance-encoding/src/encodings/physical/general.rs @@ -68,7 +68,7 @@ impl MiniBlockCompressor for GeneralMiniBlockCompressor { // Create new chunk with updated first buffer size let mut new_buffer_sizes = chunk.buffer_sizes.clone(); - new_buffer_sizes[0] = compressed_size as u16; + new_buffer_sizes[0] = compressed_size as u32; new_chunks.push(MiniBlockChunk { buffer_sizes: new_buffer_sizes, diff --git a/rust/lance-encoding/src/encodings/physical/rle.rs b/rust/lance-encoding/src/encodings/physical/rle.rs index 8f5dcc3fa0f..8e225ef5337 100644 --- a/rust/lance-encoding/src/encodings/physical/rle.rs +++ b/rust/lance-encoding/src/encodings/physical/rle.rs @@ -149,7 +149,7 @@ impl RleMiniBlockEncoder { let lengths_size = all_lengths.len() - lengths_start; let chunk = MiniBlockChunk { - buffer_sizes: vec![values_size as u16, lengths_size as u16], + buffer_sizes: vec![values_size as u32, lengths_size as u32], log_num_values, }; diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index d17275b9a4b..6c8e6dbd787 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -65,7 +65,7 @@ impl ValueEncoder { let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize); debug_assert_eq!(vals_per_chunk % values_per_word, 0); let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word); - let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap(); + let bytes_per_chunk = u32::try_from(bytes_per_chunk).unwrap(); debug_assert!(bytes_per_chunk > 0); let data_buffer = data.data; @@ -86,7 +86,7 @@ impl ValueEncoder { } else if row_offset < data.num_values { // Final chunk, special values let num_bytes = data_buffer.len() as u64 - bytes_counter; - let num_bytes = u16::try_from(num_bytes).unwrap(); + let num_bytes = u32::try_from(num_bytes).unwrap(); chunks.push(MiniBlockChunk { log_num_values: 0, buffer_sizes: vec![num_bytes], @@ -147,7 +147,7 @@ impl ValueEncoder { row_offset: usize, num_rows: usize, validity_buffers: &mut [Vec], - ) -> Vec { + ) -> Vec { let mut row_offset = row_offset; let mut num_values = num_rows; let mut buffer_counter = 0; @@ -160,14 +160,14 @@ impl ValueEncoder { .clone() .bit_slice_le_with_length(row_offset, num_values); validity_buffers[buffer_counter].extend_from_slice(&validity_slice); - buffer_sizes.push(validity_slice.len() as u16); + buffer_sizes.push(validity_slice.len() as u32); buffer_counter += 1; } } let bits_in_chunk = data.bits_per_value * num_values as u64; let bytes_in_chunk = bits_in_chunk.div_ceil(8); - let bytes_in_chunk = u16::try_from(bytes_in_chunk).unwrap(); + let bytes_in_chunk = u32::try_from(bytes_in_chunk).unwrap(); debug_assert!(bytes_in_chunk > 0); buffer_sizes.push(bytes_in_chunk); From c19d9476262a3c7acddd326223b2164775800232 Mon Sep 17 00:00:00 2001 From: Yue Ni Date: Sat, 11 Oct 2025 15:54:21 +0800 Subject: [PATCH 4/7] Add support_large_chunk for version 2.2 miniblock so that users can choose larger chunk size if needed. --- protos/encodings_v2_1.proto | 3 + rust/lance-encoding/benches/decoder.rs | 25 ++-- rust/lance-encoding/src/encoder.rs | 5 + .../src/encodings/logical/primitive.rs | 124 +++++++++++++++--- .../encodings/logical/primitive/miniblock.rs | 2 +- rust/lance-encoding/src/format.rs | 6 +- rust/lance-encoding/src/testing.rs | 2 + rust/lance-file/src/reader.rs | 1 + rust/lance-file/src/writer.rs | 4 + 9 files changed, 137 insertions(+), 35 deletions(-) diff --git a/protos/encodings_v2_1.proto b/protos/encodings_v2_1.proto index d264fae4ad2..b24df0db811 100644 --- a/protos/encodings_v2_1.proto +++ b/protos/encodings_v2_1.proto @@ -112,6 +112,9 @@ message MiniBlockLayout { // The page already records how many rows are in the page. For mini-block we also need to know how // many "items" are in the page. A row and an item are the same thing unless the page has lists. uint64 num_items = 9; + + // Since Lance 2.2, we support miniblocks with larger chunk size (>= 64KB) + bool support_large_chunk = 10; } // A layout used for pages where the data is large diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index 9e2e9dd61ba..ef3492658bb 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -45,12 +45,15 @@ const PRIMITIVE_TYPES: &[DataType] = &[ // schema doesn't yet parse them in the context of a fixed size list. const PRIMITIVE_TYPES_FOR_FSL: &[DataType] = &[DataType::Int8, DataType::Float32]; -const ENCODING_OPTIONS: EncodingOptions = EncodingOptions { - cache_bytes_per_column: 8 * 1024 * 1024, - max_page_bytes: 32 * 1024 * 1024, - keep_original_array: true, - buffer_alignment: 64, -}; +fn default_encoding_options(version: LanceFileVersion) -> EncodingOptions { + EncodingOptions { + cache_bytes_per_column: 8 * 1024 * 1024, + max_page_bytes: 32 * 1024 * 1024, + keep_original_array: true, + buffer_alignment: 64, + support_large_chunk: version >= LanceFileVersion::V2_2, + } +} fn bench_decode(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -73,7 +76,7 @@ fn bench_decode(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &default_encoding_options(LanceFileVersion::default()), )) .unwrap(); @@ -138,7 +141,7 @@ fn bench_decode_fsl(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &default_encoding_options(LanceFileVersion::default()), )) .unwrap(); b.iter(|| { @@ -204,7 +207,7 @@ fn bench_decode_str_with_dict_encoding(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &default_encoding_options(LanceFileVersion::default()), )) .unwrap(); b.iter(|| { @@ -279,7 +282,7 @@ fn bench_decode_packed_struct(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &default_encoding_options(LanceFileVersion::default()), )) .unwrap(); @@ -336,7 +339,7 @@ fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &ENCODING_OPTIONS, + &default_encoding_options(LanceFileVersion::default()), )) .unwrap(); b.iter(|| { diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index ad5b8b2235f..6c1766c26b3 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -233,6 +233,10 @@ pub struct EncodingOptions { /// The encoder needs to know this so it figures the position of out-of-line /// buffers correctly pub buffer_alignment: u64, + /// If true (the default in Lance 2.2+), miniblock buffer sizes are u32 + /// to allow storing bigger chunk size for better compression. + /// For Lance 2.1, this is false, and miniblock buffer sizes are u16. + pub support_large_chunk: bool, } impl Default for EncodingOptions { @@ -242,6 +246,7 @@ impl Default for EncodingOptions { max_page_bytes: 32 * 1024 * 1024, keep_original_array: true, buffer_alignment: 64, + support_large_chunk: LanceFileVersion::default() >= LanceFileVersion::V2_2, } } } diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 2e118dba40b..1421f4bc423 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -26,6 +26,7 @@ use crate::{ use arrow_array::{cast::AsArray, make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray}; use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer}; use arrow_schema::{DataType, Field as ArrowField}; +use bytes::Bytes; use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt}; use itertools::Itertools; use lance_arrow::deepcopy::deep_copy_nulls; @@ -154,6 +155,7 @@ struct DecodeMiniBlockTask { num_buffers: u64, max_visible_level: u16, instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>, + support_large_chunk: bool, } impl DecodeMiniBlockTask { @@ -449,15 +451,18 @@ impl DecodeMiniBlockTask { } else { None }; + + let read_size = if self.support_large_chunk { 4 } else { 2 }; + let read_fn: fn(&[u8]) -> u32 = if self.support_large_chunk { + |bytes| u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) + } else { + |bytes| u16::from_le_bytes([bytes[0], bytes[1]]) as u32 + }; + let buffer_sizes = (0..self.num_buffers) .map(|_| { - let size = u32::from_le_bytes([ - buf[offset], - buf[offset + 1], - buf[offset + 2], - buf[offset + 3], - ]); - offset += 4; + let size = read_fn(&buf[offset..offset + read_size]); + offset += read_size; size }) .collect::>(); @@ -669,6 +674,7 @@ struct MiniBlockDecoder { num_rows: u64, num_buffers: u64, dictionary: Option>, + support_large_chunk: bool, } /// See [`MiniBlockScheduler`] for more details on the scheduling and decoding @@ -716,6 +722,7 @@ impl StructuralPageDecoder for MiniBlockDecoder { def_meaning: self.def_meaning.clone(), num_buffers: self.num_buffers, max_visible_level, + support_large_chunk: self.support_large_chunk, })) } @@ -1217,6 +1224,7 @@ pub struct MiniBlockScheduler { dictionary: Option, // This is set after initialization page_meta: Option>, + support_large_chunk: bool, } impl MiniBlockScheduler { @@ -1302,6 +1310,7 @@ impl MiniBlockScheduler { dictionary, def_meaning: def_meaning.into(), page_meta: None, + support_large_chunk: layout.support_large_chunk, }) } @@ -1627,6 +1636,55 @@ impl ChunkInstructions { } } +enum Words { + U16(ScalarBuffer), + U32(ScalarBuffer), +} + +enum WordsIter<'a> { + U16(std::slice::Iter<'a, u16>), + U32(std::slice::Iter<'a, u32>), +} + +impl Words { + pub fn len(&self) -> usize { + match self { + Words::U16(b) => b.len(), + Words::U32(b) => b.len(), + } + } + + pub fn iter(&self) -> WordsIter<'_> { + match self { + Words::U16(buf) => WordsIter::U16(buf.iter()), + Words::U32(buf) => WordsIter::U32(buf.iter()), + } + } + + pub fn from_bytes(bytes: Bytes, support_large_chunk: bool) -> Result { + if support_large_chunk { + assert_eq!(bytes.len() % 4, 0); + let buffer = LanceBuffer::from_bytes(bytes, 4); + Ok(Words::U32(buffer.borrow_to_typed_slice::())) + } else { + assert_eq!(bytes.len() % 2, 0); + let buffer = LanceBuffer::from_bytes(bytes, 2); + Ok(Words::U16(buffer.borrow_to_typed_slice::())) + } + } +} + +impl<'a> Iterator for WordsIter<'a> { + type Item = u32; + + fn next(&mut self) -> Option { + match self { + WordsIter::U16(it) => it.next().map(|&x| x as u32), + WordsIter::U32(it) => it.next().map(|&x| x), + } + } +} + impl StructuralPageScheduler for MiniBlockScheduler { fn initialize<'a>( &'a mut self, @@ -1666,11 +1724,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { let rep_index_bytes = buffers.next(); // Parse the metadata and build the chunk meta - assert!(meta_bytes.len() % 4 == 0); - let bytes = LanceBuffer::from_bytes(meta_bytes, 4); - let words = bytes.borrow_to_typed_slice::(); - let words = words.as_ref(); - + let words = Words::from_bytes(meta_bytes, self.support_large_chunk)?; let mut chunk_meta = Vec::with_capacity(words.len()); let mut rows_counter = 0; @@ -1780,6 +1834,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { let def_decompressor = self.def_decompressor.clone(); let value_decompressor = self.value_decompressor.clone(); let num_buffers = self.num_buffers; + let support_large_chunk = self.support_large_chunk; let dictionary = page_meta .dictionary .as_ref() @@ -1803,6 +1858,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { dictionary, num_rows, num_buffers, + support_large_chunk, }) as Box) } .boxed(); @@ -3344,6 +3400,7 @@ pub struct PrimitiveStructuralEncoder { accumulation_queue: AccumulationQueue, keep_original_array: bool, + support_large_chunk: bool, accumulated_repdefs: Vec, // The compression strategy we will use to compress the data compression_strategy: Arc, @@ -3383,6 +3440,7 @@ impl PrimitiveStructuralEncoder { column_index, options.keep_original_array, ), + support_large_chunk: options.support_large_chunk, keep_original_array: options.keep_original_array, accumulated_repdefs: Vec::new(), column_index, @@ -3486,6 +3544,7 @@ impl PrimitiveStructuralEncoder { miniblocks: MiniBlockCompressed, rep: Option>, def: Option>, + support_large_chunk: bool, ) -> SerializedMiniBlockPage { let bytes_rep = rep .as_ref() @@ -3506,7 +3565,8 @@ impl PrimitiveStructuralEncoder { // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer let max_extra = 9 * num_buffers; let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra); - let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 4); + let chunk_size_bytes = if support_large_chunk { 4 } else { 2 }; + let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes); let mut rep_iter = rep.map(|r| r.into_iter()); let mut def_iter = def.map(|d| d.into_iter()); @@ -3537,9 +3597,14 @@ impl PrimitiveStructuralEncoder { data_buffer.extend_from_slice(&bytes_def.to_le_bytes()); } - for buffer_size in &chunk.buffer_sizes { - let bytes = *buffer_size; - data_buffer.extend_from_slice(&bytes.to_le_bytes()); + if support_large_chunk { + for &buffer_size in &chunk.buffer_sizes { + data_buffer.extend_from_slice(&buffer_size.to_le_bytes()); + } + } else { + for &buffer_size in &chunk.buffer_sizes { + data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes()); + } } // Pad @@ -3571,7 +3636,12 @@ impl PrimitiveStructuralEncoder { } let chunk_bytes = data_buffer.len() - start_pos; - assert!(chunk_bytes <= 2 * 1024 * 1024 * 1024); // 2GB limit with u32 metadata + let max_chunk_size = if support_large_chunk { + 2 * 1024 * 1024 * 1024 // 2GB limit with u32 metadata + } else { + 32 * 1024 // 32KiB limit with u16 metadata + }; + assert!(chunk_bytes <= max_chunk_size); assert!(chunk_bytes > 0); assert_eq!(chunk_bytes % 8, 0); // 4Ki values max @@ -3582,8 +3652,12 @@ impl PrimitiveStructuralEncoder { let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT; let divided_bytes_minus_one = (divided_bytes - 1) as u64; - let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u32; - meta_buffer.extend_from_slice(&metadata.to_le_bytes()); + let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64; + if support_large_chunk { + meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes()); + } else { + meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes()); + } } let data_buffer = LanceBuffer::from(data_buffer); @@ -3778,6 +3852,7 @@ impl PrimitiveStructuralEncoder { row_number: u64, dictionary_data: Option, num_rows: u64, + support_large_chunk: bool, ) -> Result { let repdef = RepDefBuilder::serialize(repdefs); @@ -3838,7 +3913,8 @@ impl PrimitiveStructuralEncoder { .as_mut() .map(|cd| std::mem::take(&mut cd.data)); - let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data); + let serialized = + Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk); // Metadata, Data, Dictionary, (maybe) Repetition Index let mut data = Vec::with_capacity(4); @@ -3868,6 +3944,7 @@ impl PrimitiveStructuralEncoder { Some((dictionary_encoding, num_dictionary_items)), &repdef.def_meaning, num_items, + support_large_chunk, ); Ok(EncodedPage { num_rows, @@ -3886,6 +3963,7 @@ impl PrimitiveStructuralEncoder { None, &repdef.def_meaning, num_items, + support_large_chunk, ); if let Some(rep_index) = rep_index { @@ -4301,6 +4379,7 @@ impl PrimitiveStructuralEncoder { let compression_strategy = self.compression_strategy.clone(); let field = self.field.clone(); let encoding_metadata = self.encoding_metadata.clone(); + let support_large_chunk = self.support_large_chunk; let task = spawn_cpu(move || { let num_values = arrays.iter().map(|arr| arr.len() as u64).sum(); @@ -4391,7 +4470,8 @@ impl PrimitiveStructuralEncoder { repdefs, row_number, Some(dictionary_data_block), - num_rows + num_rows, + support_large_chunk, ) } else if Self::should_dictionary_encode(&data_block, &field) { log::debug!( @@ -4410,6 +4490,7 @@ impl PrimitiveStructuralEncoder { row_number, Some(dictionary_data_block), num_rows, + support_large_chunk, ) } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) { log::debug!( @@ -4426,6 +4507,7 @@ impl PrimitiveStructuralEncoder { row_number, None, num_rows, + support_large_chunk, ) } else if Self::prefers_fullzip(encoding_metadata.as_ref()) { log::debug!( diff --git a/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs b/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs index 2e420921021..7332c0604cc 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs @@ -44,7 +44,7 @@ pub struct MiniBlockCompressed { pub struct MiniBlockChunk { // The size in bytes of each buffer in the chunk. // - // Updated to support larger chunk sizes (up to 4GB per buffer) + // for Lance 2.1, the chunk size is limited to 32KiB so only 16-bit is stored here and used. pub buffer_sizes: Vec, // The log (base 2) of the number of values in the chunk. If this is the final chunk // then this should be 0 (the number of values will be calculated by subtracting the diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 4ef3719e7e2..54fc12f800f 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -541,7 +541,8 @@ macro_rules! impl_common_protobuf_utils { )>, def_meaning: &[DefinitionInterpretation], num_items: u64, - ) -> crate::format::$module::PageLayout { + support_large_chunk: bool, + ) -> crate::format::$module::PageLayout { assert!(!def_meaning.is_empty()); let (dictionary, num_dictionary_items) = dictionary_encoding .map(|(d, i)| (Some(d), i)) @@ -562,7 +563,8 @@ macro_rules! impl_common_protobuf_utils { .map(|&def| Self::def_inter_to_repdef_layer(def)) .collect(), num_items, - }, + support_large_chunk, + }, ), ), } diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 26b418a1fc6..ff6a9043e0e 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -333,6 +333,7 @@ pub async fn check_round_trip_encoding_generated( cache_bytes_per_column: page_size, keep_original_array: true, buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT, + support_large_chunk: version >= LanceFileVersion::V2_2, }; encoding_strategy .create_field_encoder( @@ -712,6 +713,7 @@ pub async fn check_round_trip_encoding_of_data( max_page_bytes: test_cases.get_max_page_size(), keep_original_array: true, buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT, + support_large_chunk: file_version >= LanceFileVersion::V2_2, }; let encoder = encoding_strategy .create_field_encoder( diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index bd491f923a8..86152905dde 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -1718,6 +1718,7 @@ pub mod tests { max_page_bytes: 32 * 1024 * 1024, keep_original_array: true, buffer_alignment: 64, + support_large_chunk: version >= LanceFileVersion::V2_2, }; let encoding_strategy = default_encoding_strategy(version); diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index d32cd6712e8..ff6c5a364da 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -314,11 +314,15 @@ impl FileWriter { default_encoding_strategy(version).into() }); + // when using v2.1, we use chunks with smaller chunk sizes in metadata + let support_large_chunk = self.version() >= LanceFileVersion::V2_2; + let encoding_options = EncodingOptions { cache_bytes_per_column, max_page_bytes, keep_original_array, buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64, + support_large_chunk, }; let encoder = BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?; From 3cb33445730a46627a560c5e8307a2d0204db100 Mon Sep 17 00:00:00 2001 From: Yue Ni Date: Mon, 13 Oct 2025 18:56:09 +0800 Subject: [PATCH 5/7] Make binary mini chunk size configurable via field metadata --- rust/lance-encoding/src/compression.rs | 31 ++++++- rust/lance-encoding/src/compression_config.rs | 9 ++ rust/lance-encoding/src/constants.rs | 2 + rust/lance-encoding/src/encoder.rs | 1 + .../src/encodings/logical/primitive.rs | 86 +++++++++++++++++-- .../src/encodings/physical/binary.rs | 46 +++++++--- .../src/encodings/physical/fsst.rs | 5 +- rust/lance-file/src/writer.rs | 1 + 8 files changed, 157 insertions(+), 24 deletions(-) diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index 1e7ca8a442d..6c3e2f2d9d7 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -307,7 +307,7 @@ impl DefaultCompressionStrategy { } /// Parse compression parameters from field metadata - fn parse_field_metadata(field: &Field) -> CompressionFieldParams { + fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams { let mut params = CompressionFieldParams::default(); // Parse compression method @@ -335,6 +335,27 @@ impl DefaultCompressionStrategy { } } + // Parse binary minichunk size + if let Some(minichunk_size_str) = field + .metadata + .get(super::constants::BINARY_MINICHUNK_SIZE_META_KEY) + { + if let Ok(minichunk_size) = minichunk_size_str.parse::() { + // for lance v2.1, only 32kb or smaller is supported + if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 { + log::warn!( + "Binary minichunk_size '{}' too large for version '{}', using default", + minichunk_size, + version + ); + } else { + params.binary_minichunk_size = Some(minichunk_size); + } + } else { + log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str); + } + } + params } @@ -392,7 +413,7 @@ impl DefaultCompressionStrategy { { Box::new(FsstMiniBlockEncoder::default()) } else { - Box::new(BinaryMiniBlockEncoder::default()) + Box::new(BinaryMiniBlockEncoder::new(params.binary_minichunk_size)) }; // 4. Apply general compression if configured @@ -415,7 +436,7 @@ impl DefaultCompressionStrategy { .get_field_params(&field.name, &field.data_type()); // Override with field metadata if present (highest priority) - let metadata_params = Self::parse_field_metadata(field); + let metadata_params = Self::parse_field_metadata(field, &self.version); field_params.merge(&metadata_params); field_params @@ -1105,6 +1126,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE + binary_minichunk_size: None, }, ); @@ -1136,6 +1158,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(3), bss: Some(BssMode::Off), // Disable BSS to test RLE + binary_minichunk_size: None, }, ); @@ -1259,6 +1282,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(6), bss: None, + binary_minichunk_size: None, }, ); @@ -1401,6 +1425,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: None, + binary_minichunk_size: None, }, ); diff --git a/rust/lance-encoding/src/compression_config.rs b/rust/lance-encoding/src/compression_config.rs index d8364bc9fc2..1095849add0 100644 --- a/rust/lance-encoding/src/compression_config.rs +++ b/rust/lance-encoding/src/compression_config.rs @@ -67,6 +67,9 @@ pub struct CompressionFieldParams { /// Byte stream split mode for floating point data pub bss: Option, + + /// Minichunk size threshold for binary encoding + pub binary_minichunk_size: Option, } impl CompressionParams { @@ -131,6 +134,9 @@ impl CompressionFieldParams { if other.bss.is_some() { self.bss = other.bss; } + if other.binary_minichunk_size.is_some() { + self.binary_minichunk_size = other.binary_minichunk_size; + } } } @@ -197,6 +203,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: Some(BssMode::On), + binary_minichunk_size: None, }; params.merge(&other); @@ -210,6 +217,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(3), bss: Some(BssMode::Auto), + binary_minichunk_size: None, }; params.merge(&another); @@ -241,6 +249,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(3), bss: None, + binary_minichunk_size: None, }, ); diff --git a/rust/lance-encoding/src/constants.rs b/rust/lance-encoding/src/constants.rs index fc467e2be63..a5fb325113e 100644 --- a/rust/lance-encoding/src/constants.rs +++ b/rust/lance-encoding/src/constants.rs @@ -13,6 +13,8 @@ pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression"; pub const COMPRESSION_LEVEL_META_KEY: &str = "lance-encoding:compression-level"; /// Metadata key for specifying RLE (Run-Length Encoding) threshold pub const RLE_THRESHOLD_META_KEY: &str = "lance-encoding:rle-threshold"; +/// Metadata key for specifying binary minichunk size +pub const BINARY_MINICHUNK_SIZE_META_KEY: &str = "lance-encoding:binary-minichunk-size"; // Dictionary encoding metadata keys /// Metadata key for specifying dictionary encoding threshold divisor diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 6c1766c26b3..a146acf404a 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -689,6 +689,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: None, + binary_minichunk_size: None, }, ); diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 1421f4bc423..ed02e5832a3 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -1649,15 +1649,15 @@ enum WordsIter<'a> { impl Words { pub fn len(&self) -> usize { match self { - Words::U16(b) => b.len(), - Words::U32(b) => b.len(), + Self::U16(b) => b.len(), + Self::U32(b) => b.len(), } } pub fn iter(&self) -> WordsIter<'_> { match self { - Words::U16(buf) => WordsIter::U16(buf.iter()), - Words::U32(buf) => WordsIter::U32(buf.iter()), + Self::U16(buf) => WordsIter::U16(buf.iter()), + Self::U32(buf) => WordsIter::U32(buf.iter()), } } @@ -1665,11 +1665,11 @@ impl Words { if support_large_chunk { assert_eq!(bytes.len() % 4, 0); let buffer = LanceBuffer::from_bytes(bytes, 4); - Ok(Words::U32(buffer.borrow_to_typed_slice::())) + Ok(Self::U32(buffer.borrow_to_typed_slice::())) } else { assert_eq!(bytes.len() % 2, 0); let buffer = LanceBuffer::from_bytes(bytes, 2); - Ok(Words::U16(buffer.borrow_to_typed_slice::())) + Ok(Self::U16(buffer.borrow_to_typed_slice::())) } } } @@ -1679,8 +1679,8 @@ impl<'a> Iterator for WordsIter<'a> { fn next(&mut self) -> Option { match self { - WordsIter::U16(it) => it.next().map(|&x| x as u32), - WordsIter::U32(it) => it.next().map(|&x| x), + Self::U16(it) => it.next().map(|&x| x as u32), + Self::U32(it) => it.next().copied(), } } } @@ -5612,6 +5612,76 @@ mod tests { check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await } + #[tokio::test] + async fn test_binary_minichunk_size_roundtrip() { + use crate::constants::BINARY_MINICHUNK_SIZE_META_KEY; + use crate::testing::{check_round_trip_encoding_of_data, TestCases}; + use arrow_array::{ArrayRef, StringArray}; + use std::sync::Arc; + + // Test that binary minichunk size can be configured and works correctly in round-trip encoding + let string_data = vec![ + Some("hello".to_string()), + Some("world".to_string()), + Some("lance".to_string()), + Some("encoding".to_string()), + Some("test".to_string()), + ]; + let string_array: ArrayRef = Arc::new(StringArray::from(string_data)); + + // Create field with binary minichunk size specified in metadata + let mut metadata = HashMap::new(); + metadata.insert( + BINARY_MINICHUNK_SIZE_META_KEY.to_string(), + "64".to_string(), // 64 bytes minichunk size + ); + metadata.insert( + STRUCTURAL_ENCODING_META_KEY.to_string(), + STRUCTURAL_ENCODING_MINIBLOCK.to_string(), + ); + + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_1) + .with_batch_size(1000); + + // This should work without errors and respect the minichunk size parameter + check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await; + } + + #[tokio::test] + async fn test_binary_minichunk_size_128kb_v2_2() { + use crate::constants::BINARY_MINICHUNK_SIZE_META_KEY; + use crate::testing::{check_round_trip_encoding_of_data, TestCases}; + use arrow_array::{ArrayRef, StringArray}; + use std::sync::Arc; + + // Test that binary minichunk size can be configured to 128KB and works correctly with Lance 2.2 + // Create larger string data to better test the chunk size configuration + let mut string_data = Vec::new(); + for i in 0..100 { + string_data.push(Some(format!("test_string_{}", i).repeat(50))); // Create larger strings + } + let string_array: ArrayRef = Arc::new(StringArray::from(string_data)); + + // Create field with binary minichunk size specified in metadata (128KB = 131072 bytes) + let mut metadata = HashMap::new(); + metadata.insert( + BINARY_MINICHUNK_SIZE_META_KEY.to_string(), + (128 * 1024).to_string(), + ); + metadata.insert( + STRUCTURAL_ENCODING_META_KEY.to_string(), + STRUCTURAL_ENCODING_MINIBLOCK.to_string(), + ); + + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_batch_size(1000); + + // This should work without errors and respect the 128KB minichunk size parameter with Lance 2.2 + check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await; + } + #[tokio::test] async fn test_large_dictionary_general_compression() { use arrow_array::{ArrayRef, StringArray}; diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 39cdc3d9a71..c90e35cc919 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -31,13 +31,23 @@ use crate::format::{pb21, ProtobufUtils21}; use lance_core::utils::bit::pad_bytes_to; use lance_core::{Error, Result}; -#[derive(Debug, Default)] -pub struct BinaryMiniBlockEncoder {} +#[derive(Debug)] +pub struct BinaryMiniBlockEncoder { + minichunk_size: i64, +} + +impl Default for BinaryMiniBlockEncoder { + fn default() -> Self { + Self { + minichunk_size: *AIM_MINICHUNK_SIZE, + } + } +} const DEFAULT_AIM_MINICHUNK_SIZE: i64 = 4 * 1024; -static AIM_MINICHUNK_SIZE: std::sync::LazyLock = std::sync::LazyLock::new(|| { - std::env::var("LANCE_MINIBLOCK_CHUNK_SIZE") +pub static AIM_MINICHUNK_SIZE: std::sync::LazyLock = std::sync::LazyLock::new(|| { + std::env::var("LANCE_BINARY_MINIBLOCK_CHUNK_SIZE") .unwrap_or_else(|_| DEFAULT_AIM_MINICHUNK_SIZE.to_string()) .parse::() .unwrap_or(DEFAULT_AIM_MINICHUNK_SIZE) @@ -48,6 +58,7 @@ fn chunk_offsets( offsets: &[N], data: &[u8], alignment: usize, + minichunk_size: i64, ) -> (Vec, Vec) { #[derive(Debug)] struct ChunkInfo { @@ -67,7 +78,8 @@ fn chunk_offsets( let mut chunks = vec![]; let mut last_offset_in_orig_idx = 0; loop { - let this_last_offset_in_orig_idx = search_next_offset_idx(offsets, last_offset_in_orig_idx); + let this_last_offset_in_orig_idx = + search_next_offset_idx(offsets, last_offset_in_orig_idx, minichunk_size); let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx; let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx]; @@ -142,7 +154,11 @@ fn chunk_offsets( // this function incrementally peek the number of values in a chunk, // each time multiplies the number of values by 2. // It returns the offset_idx in `offsets` that belongs to this chunk. -fn search_next_offset_idx(offsets: &[N], last_offset_idx: usize) -> usize { +fn search_next_offset_idx( + offsets: &[N], + last_offset_idx: usize, + minichunk_size: i64, +) -> usize { let mut num_values = 1; let mut new_num_values = num_values * 2; loop { @@ -151,7 +167,7 @@ fn search_next_offset_idx(offsets: &[N], last_offset_idx: us // existing bytes plus the new offset size let new_size = existing_bytes + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap(); - if new_size.to_i64().unwrap() <= *AIM_MINICHUNK_SIZE { + if new_size.to_i64().unwrap() <= minichunk_size { // case 1: can fit the rest of all data into a miniblock return offsets.len() - 1; } else { @@ -162,7 +178,7 @@ fn search_next_offset_idx(offsets: &[N], last_offset_idx: us let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx]; let new_size = existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap(); - if new_size.to_i64().unwrap() <= *AIM_MINICHUNK_SIZE { + if new_size.to_i64().unwrap() <= minichunk_size { if new_num_values * 2 > MAX_MINIBLOCK_VALUES as usize { // hit the max number of values limit break; @@ -177,7 +193,13 @@ fn search_next_offset_idx(offsets: &[N], last_offset_idx: us } impl BinaryMiniBlockEncoder { - // put binary data into chunks, every chunk is less than or equal to `AIM_MINICHUNK_SIZE`. + pub fn new(minichunk_size: Option) -> Self { + Self { + minichunk_size: minichunk_size.unwrap_or(*AIM_MINICHUNK_SIZE), + } + } + + // put binary data into chunks, every chunk is less than or equal to `minichunk_size`. // In each chunk, offsets are put first then followed by binary bytes data, each chunk is padded to 8 bytes. // the offsets in the chunk points to the bytes offset in this chunk. fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) { @@ -186,7 +208,8 @@ impl BinaryMiniBlockEncoder { match data.bits_per_offset { 32 => { let offsets = data.offsets.borrow_to_typed_slice::(); - let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 4); + let (buffers, chunks) = + chunk_offsets(offsets.as_ref(), &data.data, 4, self.minichunk_size); ( MiniBlockCompressed { data: buffers, @@ -198,7 +221,8 @@ impl BinaryMiniBlockEncoder { } 64 => { let offsets = data.offsets.borrow_to_typed_slice::(); - let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 8); + let (buffers, chunks) = + chunk_offsets(offsets.as_ref(), &data.data, 8, self.minichunk_size); ( MiniBlockCompressed { data: buffers, diff --git a/rust/lance-encoding/src/encodings/physical/fsst.rs b/rust/lance-encoding/src/encodings/physical/fsst.rs index c74a3093b0f..62290371b39 100644 --- a/rust/lance-encoding/src/encodings/physical/fsst.rs +++ b/rust/lance-encoding/src/encodings/physical/fsst.rs @@ -138,8 +138,9 @@ impl MiniBlockCompressor for FsstMiniBlockEncoder { let data_block = DataBlock::VariableWidth(compressed.data); // compress the fsst compressed data using `BinaryMiniBlockEncoder` - let binary_compressor = - Box::new(BinaryMiniBlockEncoder::default()) as Box; + let binary_minichunk_size = *super::binary::AIM_MINICHUNK_SIZE; + let binary_compressor = Box::new(BinaryMiniBlockEncoder::new(Some(binary_minichunk_size))) + as Box; let (binary_miniblock_compressed, binary_array_encoding) = binary_compressor.compress(data_block)?; diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index ff6c5a364da..ab90c3d0c86 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -1047,6 +1047,7 @@ mod tests { compression: None, // Will use default compression if any compression_level: None, bss: Some(lance_encoding::compression_config::BssMode::Off), // Explicitly disable BSS to ensure RLE is used + binary_minichunk_size: None, }, ); From 4b71c30ffc28cd6b91de5b588d18eeae292031bf Mon Sep 17 00:00:00 2001 From: Yue Ni Date: Thu, 23 Oct 2025 10:39:27 +0800 Subject: [PATCH 6/7] Refactor WordsIter to be a struct so that we can avoid `match` U16/U32 for every `next` and this is more efficient during iteration. --- .../src/encodings/logical/primitive.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index ed02e5832a3..533965a9b30 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -1641,9 +1641,8 @@ enum Words { U32(ScalarBuffer), } -enum WordsIter<'a> { - U16(std::slice::Iter<'a, u16>), - U32(std::slice::Iter<'a, u32>), +struct WordsIter<'a> { + iter: Box + 'a>, } impl Words { @@ -1656,8 +1655,12 @@ impl Words { pub fn iter(&self) -> WordsIter<'_> { match self { - Self::U16(buf) => WordsIter::U16(buf.iter()), - Self::U32(buf) => WordsIter::U32(buf.iter()), + Self::U16(buf) => WordsIter { + iter: Box::new(buf.iter().map(|&x| x as u32)), + }, + Self::U32(buf) => WordsIter { + iter: Box::new(buf.iter().copied()), + }, } } @@ -1678,10 +1681,7 @@ impl<'a> Iterator for WordsIter<'a> { type Item = u32; fn next(&mut self) -> Option { - match self { - Self::U16(it) => it.next().map(|&x| x as u32), - Self::U32(it) => it.next().copied(), - } + self.iter.next() } } From 694aaf78173613a4bbefc9ca23594f49f297a2ea Mon Sep 17 00:00:00 2001 From: Yue Ni Date: Wed, 22 Oct 2025 18:08:41 +0800 Subject: [PATCH 7/7] Rename the parameter to has_large_chunk for decoding path, and use minichunk_size instead of binary_minichunk_size to make it more general. --- protos/encodings_v2_1.proto | 4 +- rust/lance-encoding/benches/decoder.rs | 20 +-- rust/lance-encoding/src/compression.rs | 24 +-- rust/lance-encoding/src/compression_config.rs | 14 +- rust/lance-encoding/src/constants.rs | 4 +- rust/lance-encoding/src/encoder.rs | 20 ++- .../src/encodings/logical/primitive.rs | 143 +++++++++--------- .../encodings/logical/primitive/miniblock.rs | 3 +- .../src/encodings/physical/fsst.rs | 14 +- rust/lance-encoding/src/format.rs | 4 +- rust/lance-encoding/src/testing.rs | 4 +- rust/lance-file/src/reader.rs | 2 +- rust/lance-file/src/writer.rs | 7 +- 13 files changed, 134 insertions(+), 129 deletions(-) diff --git a/protos/encodings_v2_1.proto b/protos/encodings_v2_1.proto index b24df0db811..ad33d9d2945 100644 --- a/protos/encodings_v2_1.proto +++ b/protos/encodings_v2_1.proto @@ -113,8 +113,8 @@ message MiniBlockLayout { // many "items" are in the page. A row and an item are the same thing unless the page has lists. uint64 num_items = 9; - // Since Lance 2.2, we support miniblocks with larger chunk size (>= 64KB) - bool support_large_chunk = 10; + // Since Lance 2.2, miniblocks have larger chunk sizes (>= 64KB) + bool has_large_chunk = 10; } // A layout used for pages where the data is large diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index ef3492658bb..16d3faa9d97 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -45,16 +45,6 @@ const PRIMITIVE_TYPES: &[DataType] = &[ // schema doesn't yet parse them in the context of a fixed size list. const PRIMITIVE_TYPES_FOR_FSL: &[DataType] = &[DataType::Int8, DataType::Float32]; -fn default_encoding_options(version: LanceFileVersion) -> EncodingOptions { - EncodingOptions { - cache_bytes_per_column: 8 * 1024 * 1024, - max_page_bytes: 32 * 1024 * 1024, - keep_original_array: true, - buffer_alignment: 64, - support_large_chunk: version >= LanceFileVersion::V2_2, - } -} - fn bench_decode(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let mut group = c.benchmark_group("decode_primitive"); @@ -76,7 +66,7 @@ fn bench_decode(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &default_encoding_options(LanceFileVersion::default()), + &EncodingOptions::default(), )) .unwrap(); @@ -141,7 +131,7 @@ fn bench_decode_fsl(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &default_encoding_options(LanceFileVersion::default()), + &EncodingOptions::default(), )) .unwrap(); b.iter(|| { @@ -207,7 +197,7 @@ fn bench_decode_str_with_dict_encoding(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &default_encoding_options(LanceFileVersion::default()), + &EncodingOptions::default(), )) .unwrap(); b.iter(|| { @@ -282,7 +272,7 @@ fn bench_decode_packed_struct(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &default_encoding_options(LanceFileVersion::default()), + &EncodingOptions::default(), )) .unwrap(); @@ -339,7 +329,7 @@ fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) { &data, lance_schema, encoding_strategy.as_ref(), - &default_encoding_options(LanceFileVersion::default()), + &EncodingOptions::default(), )) .unwrap(); b.iter(|| { diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index 6c3e2f2d9d7..bf106d9aa30 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -335,21 +335,21 @@ impl DefaultCompressionStrategy { } } - // Parse binary minichunk size + // Parse minichunk size if let Some(minichunk_size_str) = field .metadata - .get(super::constants::BINARY_MINICHUNK_SIZE_META_KEY) + .get(super::constants::MINICHUNK_SIZE_META_KEY) { if let Ok(minichunk_size) = minichunk_size_str.parse::() { // for lance v2.1, only 32kb or smaller is supported if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 { log::warn!( - "Binary minichunk_size '{}' too large for version '{}', using default", + "minichunk_size '{}' too large for version '{}', using default", minichunk_size, version ); } else { - params.binary_minichunk_size = Some(minichunk_size); + params.minichunk_size = Some(minichunk_size); } } else { log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str); @@ -398,12 +398,12 @@ impl DefaultCompressionStrategy { // 1. Check for explicit "none" compression if params.compression.as_deref() == Some("none") { - return Ok(Box::new(BinaryMiniBlockEncoder::default())); + return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))); } // 2. Check for explicit "fsst" compression if params.compression.as_deref() == Some("fsst") { - return Ok(Box::new(FsstMiniBlockEncoder::default())); + return Ok(Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))); } // 3. Choose base encoder (FSST or Binary) based on data characteristics @@ -411,9 +411,9 @@ impl DefaultCompressionStrategy { >= FSST_LEAST_INPUT_MAX_LENGTH && data_size >= FSST_LEAST_INPUT_SIZE as u64 { - Box::new(FsstMiniBlockEncoder::default()) + Box::new(FsstMiniBlockEncoder::new(params.minichunk_size)) } else { - Box::new(BinaryMiniBlockEncoder::new(params.binary_minichunk_size)) + Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)) }; // 4. Apply general compression if configured @@ -1126,7 +1126,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: Some(BssMode::Off), // Explicitly disable BSS to test RLE - binary_minichunk_size: None, + minichunk_size: None, }, ); @@ -1158,7 +1158,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(3), bss: Some(BssMode::Off), // Disable BSS to test RLE - binary_minichunk_size: None, + minichunk_size: None, }, ); @@ -1282,7 +1282,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(6), bss: None, - binary_minichunk_size: None, + minichunk_size: None, }, ); @@ -1425,7 +1425,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: None, - binary_minichunk_size: None, + minichunk_size: None, }, ); diff --git a/rust/lance-encoding/src/compression_config.rs b/rust/lance-encoding/src/compression_config.rs index 1095849add0..4aee75b2104 100644 --- a/rust/lance-encoding/src/compression_config.rs +++ b/rust/lance-encoding/src/compression_config.rs @@ -68,8 +68,8 @@ pub struct CompressionFieldParams { /// Byte stream split mode for floating point data pub bss: Option, - /// Minichunk size threshold for binary encoding - pub binary_minichunk_size: Option, + /// Minichunk size threshold for encoding + pub minichunk_size: Option, } impl CompressionParams { @@ -134,8 +134,8 @@ impl CompressionFieldParams { if other.bss.is_some() { self.bss = other.bss; } - if other.binary_minichunk_size.is_some() { - self.binary_minichunk_size = other.binary_minichunk_size; + if other.minichunk_size.is_some() { + self.minichunk_size = other.minichunk_size; } } } @@ -203,7 +203,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: Some(BssMode::On), - binary_minichunk_size: None, + minichunk_size: None, }; params.merge(&other); @@ -217,7 +217,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(3), bss: Some(BssMode::Auto), - binary_minichunk_size: None, + minichunk_size: None, }; params.merge(&another); @@ -249,7 +249,7 @@ mod tests { compression: Some("zstd".to_string()), compression_level: Some(3), bss: None, - binary_minichunk_size: None, + minichunk_size: None, }, ); diff --git a/rust/lance-encoding/src/constants.rs b/rust/lance-encoding/src/constants.rs index a5fb325113e..173b1a1c085 100644 --- a/rust/lance-encoding/src/constants.rs +++ b/rust/lance-encoding/src/constants.rs @@ -13,8 +13,8 @@ pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression"; pub const COMPRESSION_LEVEL_META_KEY: &str = "lance-encoding:compression-level"; /// Metadata key for specifying RLE (Run-Length Encoding) threshold pub const RLE_THRESHOLD_META_KEY: &str = "lance-encoding:rle-threshold"; -/// Metadata key for specifying binary minichunk size -pub const BINARY_MINICHUNK_SIZE_META_KEY: &str = "lance-encoding:binary-minichunk-size"; +/// Metadata key for specifying minichunk size +pub const MINICHUNK_SIZE_META_KEY: &str = "lance-encoding:minichunk-size"; // Dictionary encoding metadata keys /// Metadata key for specifying dictionary encoding threshold divisor diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index a146acf404a..b99369729fd 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -233,10 +233,9 @@ pub struct EncodingOptions { /// The encoder needs to know this so it figures the position of out-of-line /// buffers correctly pub buffer_alignment: u64, - /// If true (the default in Lance 2.2+), miniblock buffer sizes are u32 - /// to allow storing bigger chunk size for better compression. - /// For Lance 2.1, this is false, and miniblock buffer sizes are u16. - pub support_large_chunk: bool, + + /// The Lance file version being written + pub version: LanceFileVersion, } impl Default for EncodingOptions { @@ -246,11 +245,20 @@ impl Default for EncodingOptions { max_page_bytes: 32 * 1024 * 1024, keep_original_array: true, buffer_alignment: 64, - support_large_chunk: LanceFileVersion::default() >= LanceFileVersion::V2_2, + version: LanceFileVersion::default(), } } } +impl EncodingOptions { + /// If true (for Lance file version 2.2+), miniblock chunk sizes are u32, + /// to allow storing larger chunks and their sizes for better compression. + /// For Lance file version 2.1, miniblock chunk sizes are u16. + pub fn support_large_chunk(&self) -> bool { + self.version >= LanceFileVersion::V2_2 + } +} + /// A trait to pick which kind of field encoding to use for a field /// /// Unlike the ArrayEncodingStrategy, the field encoding strategy is @@ -689,7 +697,7 @@ mod tests { compression: Some("lz4".to_string()), compression_level: None, bss: None, - binary_minichunk_size: None, + minichunk_size: None, }, ); diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 533965a9b30..0381decc967 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -155,7 +155,7 @@ struct DecodeMiniBlockTask { num_buffers: u64, max_visible_level: u16, instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>, - support_large_chunk: bool, + has_large_chunk: bool, } impl DecodeMiniBlockTask { @@ -427,6 +427,28 @@ impl DecodeMiniBlockTask { } } + // read `num_buffers` buffer sizes from `buf` starting at `offset` + fn read_buffer_sizes( + buf: &[u8], + offset: &mut usize, + num_buffers: u64, + ) -> Vec { + let read_size = if LARGE { 4 } else { 2 }; + (0..num_buffers) + .map(|_| { + let bytes = &buf[*offset..*offset + read_size]; + let size = if LARGE { + u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) + } else { + // the buffer size is read from u16 but is stored as u32 after decoding for consistency + u16::from_le_bytes([bytes[0], bytes[1]]) as u32 + }; + *offset += read_size; + size + }) + .collect() + } + // Unserialize a miniblock into a collection of vectors fn decode_miniblock_chunk( &self, @@ -452,21 +474,12 @@ impl DecodeMiniBlockTask { None }; - let read_size = if self.support_large_chunk { 4 } else { 2 }; - let read_fn: fn(&[u8]) -> u32 = if self.support_large_chunk { - |bytes| u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) + let buffer_sizes = if self.has_large_chunk { + Self::read_buffer_sizes::(buf, &mut offset, self.num_buffers) } else { - |bytes| u16::from_le_bytes([bytes[0], bytes[1]]) as u32 + Self::read_buffer_sizes::(buf, &mut offset, self.num_buffers) }; - let buffer_sizes = (0..self.num_buffers) - .map(|_| { - let size = read_fn(&buf[offset..offset + read_size]); - offset += read_size; - size - }) - .collect::>(); - offset += pad_bytes::(offset); let rep = rep_size.map(|rep_size| { @@ -674,7 +687,7 @@ struct MiniBlockDecoder { num_rows: u64, num_buffers: u64, dictionary: Option>, - support_large_chunk: bool, + has_large_chunk: bool, } /// See [`MiniBlockScheduler`] for more details on the scheduling and decoding @@ -722,7 +735,7 @@ impl StructuralPageDecoder for MiniBlockDecoder { def_meaning: self.def_meaning.clone(), num_buffers: self.num_buffers, max_visible_level, - support_large_chunk: self.support_large_chunk, + has_large_chunk: self.has_large_chunk, })) } @@ -1224,7 +1237,7 @@ pub struct MiniBlockScheduler { dictionary: Option, // This is set after initialization page_meta: Option>, - support_large_chunk: bool, + has_large_chunk: bool, } impl MiniBlockScheduler { @@ -1310,7 +1323,7 @@ impl MiniBlockScheduler { dictionary, def_meaning: def_meaning.into(), page_meta: None, - support_large_chunk: layout.support_large_chunk, + has_large_chunk: layout.has_large_chunk, }) } @@ -1664,14 +1677,13 @@ impl Words { } } - pub fn from_bytes(bytes: Bytes, support_large_chunk: bool) -> Result { - if support_large_chunk { - assert_eq!(bytes.len() % 4, 0); - let buffer = LanceBuffer::from_bytes(bytes, 4); + pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result { + let bytes_per_value = if has_large_chunk { 4 } else { 2 }; + assert_eq!(bytes.len() % bytes_per_value, 0); + let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64); + if has_large_chunk { Ok(Self::U32(buffer.borrow_to_typed_slice::())) } else { - assert_eq!(bytes.len() % 2, 0); - let buffer = LanceBuffer::from_bytes(bytes, 2); Ok(Self::U16(buffer.borrow_to_typed_slice::())) } } @@ -1724,7 +1736,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { let rep_index_bytes = buffers.next(); // Parse the metadata and build the chunk meta - let words = Words::from_bytes(meta_bytes, self.support_large_chunk)?; + let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?; let mut chunk_meta = Vec::with_capacity(words.len()); let mut rows_counter = 0; @@ -1834,7 +1846,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { let def_decompressor = self.def_decompressor.clone(); let value_decompressor = self.value_decompressor.clone(); let num_buffers = self.num_buffers; - let support_large_chunk = self.support_large_chunk; + let has_large_chunk = self.has_large_chunk; let dictionary = page_meta .dictionary .as_ref() @@ -1858,7 +1870,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { dictionary, num_rows, num_buffers, - support_large_chunk, + has_large_chunk, }) as Box) } .boxed(); @@ -3440,7 +3452,7 @@ impl PrimitiveStructuralEncoder { column_index, options.keep_original_array, ), - support_large_chunk: options.support_large_chunk, + support_large_chunk: options.support_large_chunk(), keep_original_array: options.keep_original_array, accumulated_repdefs: Vec::new(), column_index, @@ -3637,7 +3649,7 @@ impl PrimitiveStructuralEncoder { let chunk_bytes = data_buffer.len() - start_pos; let max_chunk_size = if support_large_chunk { - 2 * 1024 * 1024 * 1024 // 2GB limit with u32 metadata + 4 * 1024 * 1024 * 1024 // 4GB limit with u32 metadata } else { 32 * 1024 // 32KiB limit with u16 metadata }; @@ -5612,28 +5624,22 @@ mod tests { check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await } - #[tokio::test] - async fn test_binary_minichunk_size_roundtrip() { - use crate::constants::BINARY_MINICHUNK_SIZE_META_KEY; + async fn test_minichunk_size_helper( + string_data: Vec>, + minichunk_size: u64, + file_version: LanceFileVersion, + ) { + use crate::constants::MINICHUNK_SIZE_META_KEY; use crate::testing::{check_round_trip_encoding_of_data, TestCases}; use arrow_array::{ArrayRef, StringArray}; use std::sync::Arc; - // Test that binary minichunk size can be configured and works correctly in round-trip encoding - let string_data = vec![ - Some("hello".to_string()), - Some("world".to_string()), - Some("lance".to_string()), - Some("encoding".to_string()), - Some("test".to_string()), - ]; let string_array: ArrayRef = Arc::new(StringArray::from(string_data)); - // Create field with binary minichunk size specified in metadata let mut metadata = HashMap::new(); metadata.insert( - BINARY_MINICHUNK_SIZE_META_KEY.to_string(), - "64".to_string(), // 64 bytes minichunk size + MINICHUNK_SIZE_META_KEY.to_string(), + minichunk_size.to_string(), ); metadata.insert( STRUCTURAL_ENCODING_META_KEY.to_string(), @@ -5641,45 +5647,42 @@ mod tests { ); let test_cases = TestCases::default() - .with_min_file_version(LanceFileVersion::V2_1) + .with_min_file_version(file_version) .with_batch_size(1000); - // This should work without errors and respect the minichunk size parameter check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await; } #[tokio::test] - async fn test_binary_minichunk_size_128kb_v2_2() { - use crate::constants::BINARY_MINICHUNK_SIZE_META_KEY; - use crate::testing::{check_round_trip_encoding_of_data, TestCases}; - use arrow_array::{ArrayRef, StringArray}; - use std::sync::Arc; - - // Test that binary minichunk size can be configured to 128KB and works correctly with Lance 2.2 - // Create larger string data to better test the chunk size configuration + async fn test_minichunk_size_roundtrip() { + // Test that minichunk size can be configured and works correctly in round-trip encoding let mut string_data = Vec::new(); for i in 0..100 { - string_data.push(Some(format!("test_string_{}", i).repeat(50))); // Create larger strings + string_data.push(Some(format!("test_string_{}", i).repeat(50))); } - let string_array: ArrayRef = Arc::new(StringArray::from(string_data)); - - // Create field with binary minichunk size specified in metadata (128KB = 131072 bytes) - let mut metadata = HashMap::new(); - metadata.insert( - BINARY_MINICHUNK_SIZE_META_KEY.to_string(), - (128 * 1024).to_string(), - ); - metadata.insert( - STRUCTURAL_ENCODING_META_KEY.to_string(), - STRUCTURAL_ENCODING_MINIBLOCK.to_string(), - ); + // configure minichunk size to 64 bytes (smaller than the default 4kb) for Lance 2.1 + test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await; + } - let test_cases = TestCases::default() - .with_min_file_version(LanceFileVersion::V2_2) - .with_batch_size(1000); + #[tokio::test] + async fn test_minichunk_size_128kb_v2_2() { + // Test that minichunk size can be configured to 128KB and works correctly with Lance 2.2 + let mut string_data = Vec::new(); + // create a 500kb string array + for i in 0..10000 { + string_data.push(Some(format!("test_string_{}", i).repeat(50))); + } + test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await; + } - // This should work without errors and respect the 128KB minichunk size parameter with Lance 2.2 - check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await; + #[tokio::test] + async fn test_binary_large_minichunk_size_over_max_miniblock_values() { + let mut string_data = Vec::new(); + // 128kb/chunk / 6 bytes (t_9999) = 21845 > max 4096 items per chunk + for i in 0..10000 { + string_data.push(Some(format!("t_{}", i))); + } + test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await; } #[tokio::test] diff --git a/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs b/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs index 7332c0604cc..6da985e9ec0 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs @@ -44,7 +44,8 @@ pub struct MiniBlockCompressed { pub struct MiniBlockChunk { // The size in bytes of each buffer in the chunk. // - // for Lance 2.1, the chunk size is limited to 32KiB so only 16-bit is stored here and used. + // In Lance 2.1, the chunk size is limited to 32KiB, so only 16-bits are used. + // Since Lance 2.2, the chunk size uses u32 to support larger chunk size pub buffer_sizes: Vec, // The log (base 2) of the number of values in the chunk. If this is the final chunk // then this should be 0 (the number of values will be calculated by subtracting the diff --git a/rust/lance-encoding/src/encodings/physical/fsst.rs b/rust/lance-encoding/src/encodings/physical/fsst.rs index 62290371b39..25dac9b7896 100644 --- a/rust/lance-encoding/src/encodings/physical/fsst.rs +++ b/rust/lance-encoding/src/encodings/physical/fsst.rs @@ -129,7 +129,15 @@ impl FsstCompressed { } #[derive(Debug, Default)] -pub struct FsstMiniBlockEncoder {} +pub struct FsstMiniBlockEncoder { + minichunk_size: Option, +} + +impl FsstMiniBlockEncoder { + pub fn new(minichunk_size: Option) -> Self { + Self { minichunk_size } + } +} impl MiniBlockCompressor for FsstMiniBlockEncoder { fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> { @@ -138,8 +146,7 @@ impl MiniBlockCompressor for FsstMiniBlockEncoder { let data_block = DataBlock::VariableWidth(compressed.data); // compress the fsst compressed data using `BinaryMiniBlockEncoder` - let binary_minichunk_size = *super::binary::AIM_MINICHUNK_SIZE; - let binary_compressor = Box::new(BinaryMiniBlockEncoder::new(Some(binary_minichunk_size))) + let binary_compressor = Box::new(BinaryMiniBlockEncoder::new(self.minichunk_size)) as Box; let (binary_miniblock_compressed, binary_array_encoding) = @@ -368,7 +375,6 @@ impl MiniBlockDecompressor for FsstMiniBlockDecompressor { #[cfg(test)] mod tests { - use std::collections::HashMap; use lance_datagen::{ByteCount, RowCount}; diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 54fc12f800f..30b1220826e 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -541,7 +541,7 @@ macro_rules! impl_common_protobuf_utils { )>, def_meaning: &[DefinitionInterpretation], num_items: u64, - support_large_chunk: bool, + has_large_chunk: bool, ) -> crate::format::$module::PageLayout { assert!(!def_meaning.is_empty()); let (dictionary, num_dictionary_items) = dictionary_encoding @@ -563,7 +563,7 @@ macro_rules! impl_common_protobuf_utils { .map(|&def| Self::def_inter_to_repdef_layer(def)) .collect(), num_items, - support_large_chunk, + has_large_chunk, }, ), ), diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index ff6a9043e0e..1b42db71477 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -333,7 +333,7 @@ pub async fn check_round_trip_encoding_generated( cache_bytes_per_column: page_size, keep_original_array: true, buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT, - support_large_chunk: version >= LanceFileVersion::V2_2, + version, }; encoding_strategy .create_field_encoder( @@ -713,7 +713,7 @@ pub async fn check_round_trip_encoding_of_data( max_page_bytes: test_cases.get_max_page_size(), keep_original_array: true, buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT, - support_large_chunk: file_version >= LanceFileVersion::V2_2, + version: file_version, }; let encoder = encoding_strategy .create_field_encoder( diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index 86152905dde..0c7f69ae88a 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -1718,7 +1718,7 @@ pub mod tests { max_page_bytes: 32 * 1024 * 1024, keep_original_array: true, buffer_alignment: 64, - support_large_chunk: version >= LanceFileVersion::V2_2, + version, }; let encoding_strategy = default_encoding_strategy(version); diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index ab90c3d0c86..50d5ba819bf 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -314,15 +314,12 @@ impl FileWriter { default_encoding_strategy(version).into() }); - // when using v2.1, we use chunks with smaller chunk sizes in metadata - let support_large_chunk = self.version() >= LanceFileVersion::V2_2; - let encoding_options = EncodingOptions { cache_bytes_per_column, max_page_bytes, keep_original_array, buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64, - support_large_chunk, + version: self.version(), }; let encoder = BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?; @@ -1047,7 +1044,7 @@ mod tests { compression: None, // Will use default compression if any compression_level: None, bss: Some(lance_encoding::compression_config::BssMode::Off), // Explicitly disable BSS to ensure RLE is used - binary_minichunk_size: None, + minichunk_size: None, }, );