diff --git a/protos/encodings_v2_1.proto b/protos/encodings_v2_1.proto index 83c8c771227..46fd012fb58 100644 --- a/protos/encodings_v2_1.proto +++ b/protos/encodings_v2_1.proto @@ -166,6 +166,17 @@ message ConstantLayout { // - MUST be absent for an all-null page // - MUST be <= 32 bytes if present optional bytes inline_value = 6; + + // Optional compression algorithm used for the repetition buffer. + // If absent, repetition levels are stored as raw u16 values. + CompressiveEncoding rep_compression = 7; + // Optional compression algorithm used for the definition buffer. + // If absent, definition levels are stored as raw u16 values. + CompressiveEncoding def_compression = 8; + // Number of values in repetition buffer after decompression. + uint64 num_rep_values = 9; + // Number of values in definition buffer after decompression. + uint64 num_def_values = 10; } // A layout where large binary data is encoded externally and only diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 9e22c414d48..9c3e54ce3af 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -779,12 +779,20 @@ pub struct ComplexAllNullScheduler { def_meaning: Arc<[DefinitionInterpretation]>, repdef: Option>, max_visible_level: u16, + rep_decompressor: Option>, + def_decompressor: Option>, + num_rep_values: u64, + num_def_values: u64, } impl ComplexAllNullScheduler { pub fn new( buffer_offsets_and_sizes: Arc<[(u64, u64)]>, def_meaning: Arc<[DefinitionInterpretation]>, + rep_decompressor: Option>, + def_decompressor: Option>, + num_rep_values: u64, + num_def_values: u64, ) -> Self { let max_visible_level = def_meaning .iter() @@ -796,6 +804,10 @@ impl ComplexAllNullScheduler { def_meaning, repdef: None, max_visible_level, + rep_decompressor, + def_decompressor, + num_rep_values, + num_def_values, } } } @@ -820,25 +832,89 @@ impl StructuralPageScheduler for ComplexAllNullScheduler { } let data = io.submit_request(reads, 0); + let rep_decompressor = self.rep_decompressor.clone(); + let def_decompressor = self.def_decompressor.clone(); + let num_rep_values = self.num_rep_values; + let num_def_values = self.num_def_values; async move { let data = data.await?; let mut data_iter = data.into_iter(); + let decompress_levels = |compressed_bytes: Bytes, + decompressor: &Arc, + num_values: u64, + level_type: &str| + -> Result> { + let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1); + let decompressed = decompressor.decompress(compressed_buffer, num_values)?; + match decompressed { + DataBlock::FixedWidth(block) => { + if block.num_values != num_values { + return Err(Error::InvalidInput { + source: format!( + "Unexpected {} level count after decompression: expected {}, got {}", + level_type, num_values, block.num_values + ) + .into(), + location: location!(), + }); + } + if block.bits_per_value != 16 { + return Err(Error::InvalidInput { + source: format!( + "Unexpected {} level bit width after decompression: expected 16, got {}", + level_type, block.bits_per_value + ) + .into(), + location: location!(), + }); + } + Ok(block.data.borrow_to_typed_slice::()) + } + _ => Err(Error::InvalidInput { + source: format!( + "Expected fixed-width data block for {} levels", + level_type + ) + .into(), + location: location!(), + }), + } + }; + let rep = if has_rep { let rep = data_iter.next().unwrap(); - let rep = LanceBuffer::from_bytes(rep, 2); - let rep = rep.borrow_to_typed_slice::(); - Some(rep) + if let Some(rep_decompressor) = rep_decompressor.as_ref() { + Some(decompress_levels( + rep, + rep_decompressor, + num_rep_values, + "repetition", + )?) + } else { + let rep = LanceBuffer::from_bytes(rep, 2); + let rep = rep.borrow_to_typed_slice::(); + Some(rep) + } } else { None }; let def = if has_def { let def = data_iter.next().unwrap(); - let def = LanceBuffer::from_bytes(def, 2); - let def = def.borrow_to_typed_slice::(); - Some(def) + if let Some(def_decompressor) = def_decompressor.as_ref() { + Some(decompress_levels( + def, + def_decompressor, + num_def_values, + "definition", + )?) + } else { + let def = LanceBuffer::from_bytes(def, 2); + let def = def.borrow_to_typed_slice::(); + Some(def) + } } else { None }; @@ -3093,9 +3169,27 @@ impl StructuralPrimitiveFieldScheduler { { Box::new(SimpleAllNullScheduler::default()) as Box } else { + let rep_decompressor = constant_layout + .rep_compression + .as_ref() + .map(|encoding| decompressors.create_block_decompressor(encoding)) + .transpose()? + .map(Arc::from); + + let def_decompressor = constant_layout + .def_compression + .as_ref() + .map(|encoding| decompressors.create_block_decompressor(encoding)) + .transpose()? + .map(Arc::from); + Box::new(ComplexAllNullScheduler::new( page_info.buffer_offsets_and_sizes.clone(), def_meaning.into(), + rep_decompressor, + def_decompressor, + constant_layout.num_rep_values, + constant_layout.num_def_values, )) as Box } } @@ -3833,6 +3927,27 @@ impl PrimitiveStructuralEncoder { }) } + fn encode_complex_all_null_vals( + data: &Arc<[u16]>, + compression_strategy: &dyn CompressionStrategy, + ) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> { + let buffer = LanceBuffer::reinterpret_slice(data.clone()); + let mut fixed_width_block = FixedWidthDataBlock { + data: buffer, + bits_per_value: 16, + num_values: data.len() as u64, + block_info: BlockInfo::new(), + }; + fixed_width_block.compute_stat(); + + let levels_block = DataBlock::FixedWidth(fixed_width_block); + let levels_field = Field::new_arrow("", DataType::UInt16, false)?; + let (compressor, encoding) = + compression_strategy.create_block_compressor(&levels_field, &levels_block)?; + let compressed_buffer = compressor.compress(levels_block)?; + Ok((compressed_buffer, encoding)) + } + // Encodes a page where all values are null but we have rep/def // information that we need to store (e.g. to distinguish between // different kinds of null) @@ -3841,21 +3956,59 @@ impl PrimitiveStructuralEncoder { repdef: crate::repdef::SerializedRepDefs, row_number: u64, num_rows: u64, + version: LanceFileVersion, + compression_strategy: &dyn CompressionStrategy, ) -> Result { - // TODO: Actually compress repdef - let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() { - LanceBuffer::reinterpret_slice(rep.clone()) + if version.resolve() < LanceFileVersion::V2_2 { + let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() { + LanceBuffer::reinterpret_slice(rep.clone()) + } else { + LanceBuffer::empty() + }; + + let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() { + LanceBuffer::reinterpret_slice(def.clone()) + } else { + LanceBuffer::empty() + }; + + let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None); + return Ok(EncodedPage { + column_idx, + data: vec![rep_bytes, def_bytes], + description: PageEncoding::Structural(description), + num_rows, + row_number, + }); + } + + let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) = + repdef.repetition_levels.as_ref() + { + let num_values = rep.len() as u64; + let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?; + (buffer, Some(encoding), num_values) } else { - LanceBuffer::empty() + (LanceBuffer::empty(), None, 0) }; - let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() { - LanceBuffer::reinterpret_slice(def.clone()) + let (def_bytes, def_encoding, num_def_values) = if let Some(def) = + repdef.definition_levels.as_ref() + { + let num_values = def.len() as u64; + let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?; + (buffer, Some(encoding), num_values) } else { - LanceBuffer::empty() + (LanceBuffer::empty(), None, 0) }; - let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None); + let description = ProtobufUtils21::compressed_all_null_constant_layout( + &repdef.def_meaning, + rep_encoding, + def_encoding, + num_rep_values, + num_def_values, + ); Ok(EncodedPage { column_idx, data: vec![rep_bytes, def_bytes], @@ -4631,7 +4784,14 @@ impl PrimitiveStructuralEncoder { // either have all empty lists or all null lists (or a mix). We still need to encode // the rep/def information but we can skip the data encoding. log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows); - return Self::encode_complex_all_null(column_idx, repdef, row_number, num_rows); + return Self::encode_complex_all_null( + column_idx, + repdef, + row_number, + num_rows, + version, + compression_strategy.as_ref(), + ); } let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?; @@ -4656,7 +4816,14 @@ impl PrimitiveStructuralEncoder { num_values, num_rows ); - Self::encode_complex_all_null(column_idx, repdef, row_number, num_rows) + Self::encode_complex_all_null( + column_idx, + repdef, + row_number, + num_rows, + version, + compression_strategy.as_ref(), + ) }; } @@ -6511,4 +6678,89 @@ mod tests { .with_page_sizes(vec![4096]); check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await; } + + #[test] + fn test_encode_decode_complex_all_null_vals_roundtrip() { + use crate::compression::{ + DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy, + }; + + let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::>()); + + let compression_strategy = DefaultCompressionStrategy::default(); + let decompression_strategy = DefaultDecompressionStrategy::default(); + + let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals( + &values, + &compression_strategy, + ) + .unwrap(); + + let decompressor = decompression_strategy + .create_block_decompressor(&encoding) + .unwrap(); + let decompressed = decompressor + .decompress(compressed_buf, values.len() as u64) + .unwrap(); + let decompressed_fixed_width = decompressed.as_fixed_width().unwrap(); + assert_eq!(decompressed_fixed_width.num_values, values.len() as u64); + assert_eq!(decompressed_fixed_width.bits_per_value, 16); + let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::(); + assert_eq!(rep_result.as_ref(), values.as_ref()); + } + + #[tokio::test] + async fn test_complex_all_null_compression_gated_by_version() { + use crate::format::pb21::page_layout::Layout; + use arrow_array::ListArray; + + let list_array = ListArray::from_iter_primitive::( + (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }), + ); + let arr: ArrayRef = Arc::new(list_array); + let field = arrow_schema::Field::new( + "c", + DataType::List(Arc::new(arrow_schema::Field::new( + "item", + DataType::Int32, + true, + ))), + true, + ); + + let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await; + let PageEncoding::Structural(layout_v21) = &page_v21.description else { + panic!("Expected structural encoding"); + }; + let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else { + panic!("Expected constant layout"); + }; + assert!(layout_v21.rep_compression.is_none()); + assert!(layout_v21.def_compression.is_none()); + assert_eq!(layout_v21.num_rep_values, 0); + assert_eq!(layout_v21.num_def_values, 0); + + let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await; + let PageEncoding::Structural(layout_v22) = &page_v22.description else { + panic!("Expected structural encoding"); + }; + let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else { + panic!("Expected constant layout"); + }; + assert!(layout_v22.def_compression.is_some()); + assert!(layout_v22.num_def_values > 0); + } + + #[tokio::test] + async fn test_complex_all_null_round_trip() { + use arrow_array::ListArray; + + let list_array = ListArray::from_iter_primitive::( + (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }), + ); + + let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2); + check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new()) + .await; + } } diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 7114f17e31f..90de90d359a 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -678,6 +678,34 @@ impl ProtobufUtils21 { layout: Some(crate::format::pb21::page_layout::Layout::ConstantLayout( crate::format::pb21::ConstantLayout { inline_value: inline_value.map(bytes::Bytes::from), + rep_compression: None, + def_compression: None, + num_rep_values: 0, + num_def_values: 0, + layers: def_meaning + .iter() + .map(|&def| Self::def_inter_to_repdef_layer(def)) + .collect(), + }, + )), + } + } + + pub fn compressed_all_null_constant_layout( + def_meaning: &[DefinitionInterpretation], + rep_compression: Option, + def_compression: Option, + num_rep_values: u64, + num_def_values: u64, + ) -> crate::format::pb21::PageLayout { + crate::format::pb21::PageLayout { + layout: Some(crate::format::pb21::page_layout::Layout::ConstantLayout( + crate::format::pb21::ConstantLayout { + inline_value: None, + rep_compression, + def_compression, + num_rep_values, + num_def_values, layers: def_meaning .iter() .map(|&def| Self::def_inter_to_repdef_layer(def))