From 72d69209dc00986c9228595c596fe65bf0061227 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 4 Dec 2024 11:57:21 -0500 Subject: [PATCH 01/15] packed-struct encoding --- protos/encodings.proto | 6 + rust/lance-encoding/src/data.rs | 57 +++++- rust/lance-encoding/src/decoder.rs | 13 ++ rust/lance-encoding/src/encoder.rs | 8 + .../src/encodings/logical/struct.rs | 29 +++- rust/lance-encoding/src/encodings/physical.rs | 2 + .../src/encodings/physical/packed_struct.rs | 2 +- .../src/encodings/physical/struct_encoding.rs | 163 ++++++++++++++++++ rust/lance-encoding/src/format.rs | 22 ++- rust/lance-encoding/src/statistics.rs | 30 +++- rust/lance-file/src/v2/reader.rs | 36 ++-- 11 files changed, 334 insertions(+), 34 deletions(-) create mode 100644 rust/lance-encoding/src/encodings/physical/struct_encoding.rs diff --git a/protos/encodings.proto b/protos/encodings.proto index cac0d0d5e5f..c5994dfecac 100644 --- a/protos/encodings.proto +++ b/protos/encodings.proto @@ -258,6 +258,11 @@ message PackedStruct { Buffer buffer = 2; } +message PackedStructFixedWidthMiniBlock { + ArrayEncoding Flat = 1; + repeated uint32 bits_per_values = 2; +} + message FixedSizeBinary { ArrayEncoding bytes = 1; uint32 byte_width = 2; @@ -283,6 +288,7 @@ message ArrayEncoding { BinaryMiniBlock binary_mini_block = 15; FsstMiniBlock fsst_mini_block = 16; BinaryBlock binary_block = 17; + PackedStructFixedWidthMiniBlock packed_struct_fixed_width_mini_block = 18; } } diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index d383d924997..d85ec3b9865 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -341,6 +341,43 @@ impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder { } } +struct StructDataBlockBuilder { + children: Vec>, +} + +impl StructDataBlockBuilder { + fn new(bits_per_values: Vec, estimated_size_bytes: u64) -> Self { + let mut children = vec![]; + for bits_per_value in bits_per_values.iter() { + let child = FixedWidthDataBlockBuilder::new(*bits_per_value as u64, estimated_size_bytes); + children.push(Box::new(child) as Box); + } + Self { + children, + } + } +} + +impl DataBlockBuilderImpl for StructDataBlockBuilder { + fn append(&mut self, data_block: &DataBlock, selection: Range) { + let data_block = data_block.as_struct_ref().unwrap(); + for i in 0..self.children.len() { + self.children[i].append(&data_block.children[i], selection.clone()); + } + } + + fn finish(self: Box) -> DataBlock { + let mut children_data_block = Vec::new(); + for child in self.children { + let child_data_block = child.finish(); + children_data_block.push(child_data_block); + } + DataBlock::Struct(StructDataBlock { + children: children_data_block, + block_info: BlockInfo::new(), + }) + } +} /// A data block to represent a fixed size list #[derive(Debug)] pub struct FixedSizeListBlock { @@ -583,6 +620,7 @@ impl VariableWidthBlock { pub struct StructDataBlock { /// The child arrays pub children: Vec, + pub block_info: BlockInfo, } impl StructDataBlock { @@ -616,6 +654,7 @@ impl StructDataBlock { .into_iter() .map(|c| c.remove_validity()) .collect(), + block_info: self.block_info.clone(), } } @@ -633,6 +672,7 @@ impl StructDataBlock { .iter_mut() .map(|c| c.borrow_and_clone()) .collect(), + block_info: self.block_info.clone(), } } @@ -643,8 +683,14 @@ impl StructDataBlock { .iter() .map(|c| c.try_clone()) .collect::>()?, + block_info: self.block_info.clone(), }) } + + pub fn data_size(&self) -> u64 { + self.children.iter() + .map(|data_block| data_block.data_size()).sum() + } } /// A data block for dictionary encoded data @@ -897,6 +943,15 @@ impl DataBlock { inner.dimension, )) } + Self::Struct(struct_data_block) => { + let mut bits_per_values = vec![]; + for child in struct_data_block.children.iter() { + let child = child.as_fixed_width_ref(). + expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported."); + bits_per_values.push(child.bits_per_value as u32); + } + Box::new(StructDataBlockBuilder::new(bits_per_values, estimated_size_bytes)) + } _ => todo!(), } } @@ -1356,7 +1411,7 @@ impl DataBlock { .collect::>(); children.push(Self::from_arrays(&child_vec, num_values)); } - Self::Struct(StructDataBlock { children }) + Self::Struct(StructDataBlock { children, block_info: BlockInfo::default() }) } DataType::FixedSizeList(_, dim) => { let children = arrays diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 552abd68017..96326d6f6e2 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -251,6 +251,7 @@ use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor; use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor; use crate::encodings::physical::fsst::FsstMiniBlockDecompressor; use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor}; +use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor; use crate::encodings::physical::{ColumnBuffers, FileBuffers}; use crate::format::pb::{self, column_encoding}; use crate::repdef::{LevelBuffer, RepDefUnraveler}; @@ -510,6 +511,9 @@ impl DecompressorStrategy for CoreDecompressorStrategy { pb::array_encoding::ArrayEncoding::FsstMiniBlock(description) => { Ok(Box::new(FsstMiniBlockDecompressor::new(description))) } + pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => { + Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(&description))) + } _ => todo!(), } } @@ -755,6 +759,15 @@ impl CoreFieldDecoderStrategy { } match &data_type { DataType::Struct(fields) => { + let field_metadata = &field.metadata; + if field_metadata.get("packed").map(|v| v == "true").unwrap_or(false) { + let column_info = column_infos.expect_next()?; + let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new( + column_info.as_ref(), + self.decompressor_strategy.as_ref(), + )?); + return Ok(scheduler); + } let mut child_schedulers = Vec::with_capacity(field.children.len()); for field in field.children.iter() { let field_scheduler = diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 352c41cb768..971bb8fd93d 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -33,6 +33,7 @@ use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder; use crate::encodings::physical::fixed_size_list::FslPerValueCompressor; use crate::encodings::physical::fsst::{FsstArrayEncoder, FsstMiniBlockEncoder}; use crate::encodings::physical::packed_struct::PackedStructEncoder; +use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder; use crate::format::ProtobufUtils; use crate::repdef::RepDefBuilder; use crate::statistics::{GetStat, Stat}; @@ -822,6 +823,13 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { return Ok(Box::new(BinaryMiniBlockEncoder::default())); } } + if let DataBlock::Struct(ref struct_data_block) = data { + if struct_data_block.children.iter() + .any(|child|!matches!(child, DataBlock::FixedWidth(_))) { + panic!("packed struct encoding currenlty only supports fixed-width fields.") + } + return Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default())); + } Ok(Box::new(ValueEncoder::default())) } diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index a4cc44afc71..4ec2720ff43 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -587,15 +587,26 @@ pub struct StructuralStructDecoder { impl StructuralStructDecoder { pub fn new(fields: Fields, should_validate: bool) -> Self { - let children = fields - .iter() - .map(|field| Self::field_to_decoder(field, should_validate)) - .collect(); - let data_type = DataType::Struct(fields.clone()); - Self { - data_type, - children, - child_fields: fields, + let field0_metadata = fields[0].metadata(); + if field0_metadata.get("packed").map(|v| v == "true").unwrap_or(false) { + let data_type = DataType::Struct(fields.clone()); + let child = StructuralPrimitiveFieldDecoder::new(&fields[0], should_validate); + Self { + data_type, + children: vec![Box::new(child)], + child_fields: fields, + } + } else { + let children = fields + .iter() + .map(|field| Self::field_to_decoder(field, should_validate)) + .collect(); + let data_type = DataType::Struct(fields.clone()); + Self { + data_type, + children, + child_fields: fields, + } } } diff --git a/rust/lance-encoding/src/encodings/physical.rs b/rust/lance-encoding/src/encodings/physical.rs index a108b679e16..8f5f76c787e 100644 --- a/rust/lance-encoding/src/encodings/physical.rs +++ b/rust/lance-encoding/src/encodings/physical.rs @@ -29,6 +29,7 @@ pub mod fixed_size_binary; pub mod fixed_size_list; pub mod fsst; pub mod packed_struct; +pub mod struct_encoding; pub mod value; /// These contain the file buffers shared across the entire file @@ -287,6 +288,7 @@ pub fn decoder_from_array_encoding( pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => unreachable!(), pb::array_encoding::ArrayEncoding::FsstMiniBlock(_) => unreachable!(), pb::array_encoding::ArrayEncoding::BinaryBlock(_) => unreachable!(), + pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(_) => unreachable!(), } } diff --git a/rust/lance-encoding/src/encodings/physical/packed_struct.rs b/rust/lance-encoding/src/encodings/physical/packed_struct.rs index 4feca6d9c4c..73b2baa4c58 100644 --- a/rust/lance-encoding/src/encodings/physical/packed_struct.rs +++ b/rust/lance-encoding/src/encodings/physical/packed_struct.rs @@ -151,7 +151,7 @@ impl PrimitivePageDecoder for PackedStructPageDecoder { let child_block = FixedSizeListBlock::from_flat(child_block, field.data_type()); children.push(child_block); } - Ok(DataBlock::Struct(StructDataBlock { children })) + Ok(DataBlock::Struct(StructDataBlock { children, block_info: BlockInfo::default() })) } } diff --git a/rust/lance-encoding/src/encodings/physical/struct_encoding.rs b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs new file mode 100644 index 00000000000..b0115941b51 --- /dev/null +++ b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs @@ -0,0 +1,163 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use arrow::datatypes::UInt64Type; + +use lance_core::{Error, Result}; +use snafu::{location, Location}; + +use crate::{ + buffer::LanceBuffer, + data::{ + BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock, + }, + decoder::MiniBlockDecompressor, + encoder::{MiniBlockCompressed, MiniBlockCompressor}, + format::{ + pb::{self}, + ProtobufUtils, + }, + statistics::{GetStat, Stat}, +}; + +use super::value::{ValueDecompressor, ValueEncoder}; + +fn struct_data_block_to_fixed_width_data_block( + struct_data_block: StructDataBlock, + bits_per_values: &[u32], +) -> DataBlock { + let data_size = struct_data_block.expect_single_stat::(Stat::DataSize); + let mut output = Vec::with_capacity(data_size as usize); + let num_values = struct_data_block.children[0].num_values(); + + for i in 0..num_values as usize { + for (j, child) in struct_data_block.children.iter().enumerate() { + let bytes_per_value = (bits_per_values[j] / 8) as usize; + let this_data = child + .as_fixed_width_ref() + .unwrap() + .data + .slice_with_length(bytes_per_value * i, bytes_per_value); + output.extend_from_slice(&this_data); + } + } + + DataBlock::FixedWidth(FixedWidthDataBlock { + bits_per_value: bits_per_values + .iter() + .map(|bits_per_value| *bits_per_value as u64) + .sum(), + data: LanceBuffer::Owned(output), + num_values, + block_info: BlockInfo::default(), + }) +} + +#[derive(Debug, Default)] +pub struct PackedStructFixedWidthMiniBlockEncoder {} + +impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder { + fn compress( + &self, + data: DataBlock, + ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> { + match data { + DataBlock::Struct(struct_data_block) => { + let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value as u32).collect::>(); + let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values); + + let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box; + let (value_miniblock_compressed, value_array_encoding) = + value_miniblock_compressor.compress(data_block)?; + println!("MiniBlockCompressed.num_values: {}", value_miniblock_compressed.num_values); + Ok(( + value_miniblock_compressed, + ProtobufUtils::packed_struct_fixed_width_mini_block(value_array_encoding, bits_per_values), + )) + } + _ => Err(Error::InvalidInput { + source: format!( + "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder", + data.name() + ) + .into(), + location: location!(), + }), + } + } +} + +#[derive(Debug)] +pub struct PackedStructFixedWidthMiniBlockDecompressor { + bits_per_values: Vec, + array_encoding: Box, +} + +impl PackedStructFixedWidthMiniBlockDecompressor { + pub fn new(description: &pb::PackedStructFixedWidthMiniBlock) -> Self { + let array_encoding: Box = match description + .flat + .as_ref() + .unwrap() + .array_encoding + .as_ref() + .unwrap() + { + pb::array_encoding::ArrayEncoding::Flat(flat) => Box::new(ValueDecompressor::new(flat)), + _ => panic!("Unsupported array encoding"), + }; + Self { + bits_per_values: description.bits_per_values.clone(), + array_encoding, + } + } +} + +impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor { + fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result { + let encoded_data_block = self.array_encoding.decompress(data, num_values)?; + let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else { + panic!("ValueDecompressor should output FixedWidth DataBlock") + }; + + let bytes_per_values = self + .bits_per_values + .iter() + .map(|bits_per_value| *bits_per_value as usize / 8) + .collect::>(); + + assert!(encoded_data_block.bits_per_value % 8 == 0); + + let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize; + let mut prefix_sum = vec![0; self.bits_per_values.len() + 1]; + for i in 0..self.bits_per_values.len() { + prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i]; + } + + let mut children_data_block = vec![]; + for i in 0..self.bits_per_values.len() { + let child_buf_size = bytes_per_values[i] * num_values as usize; + let mut child_buf: Vec = Vec::with_capacity(child_buf_size); + + for j in 0..num_values as usize { + let this_value = encoded_data_block.data.slice_with_length( + prefix_sum[i] + (j * encoded_bytes_per_row), + bytes_per_values[i], + ); + child_buf.extend_from_slice(&this_value); + } + + let child = DataBlock::FixedWidth(FixedWidthDataBlock { + data: LanceBuffer::Owned(child_buf), + bits_per_value: self.bits_per_values[i] as u64, + num_values, + block_info: BlockInfo::default(), + }); + children_data_block.push(child); + } + Ok(DataBlock::Struct(StructDataBlock { + children: children_data_block, + block_info: BlockInfo::default(), + })) + } +} \ No newline at end of file diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index cf90c49ab2f..96ccac3bc23 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -15,13 +15,7 @@ pub mod pb { } use pb::{ - array_encoding::ArrayEncoding as ArrayEncodingEnum, - buffer::BufferType, - nullable::{AllNull, NoNull, Nullability, SomeNull}, - page_layout::Layout, - AllNullLayout, ArrayEncoding, Binary, BinaryBlock, BinaryMiniBlock, Bitpack2, Bitpacked, - BitpackedForNonNeg, Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, FsstMiniBlock, - MiniBlockLayout, Nullable, PackedStruct, PageLayout, + array_encoding::ArrayEncoding as ArrayEncodingEnum, buffer::BufferType, nullable::{AllNull, NoNull, Nullability, SomeNull}, page_layout::Layout, AllNullLayout, ArrayEncoding, Binary, BinaryBlock, BinaryMiniBlock, Bitpack2, Bitpacked, BitpackedForNonNeg, Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, FsstMiniBlock, MiniBlockLayout, Nullable, PackedStruct, PackedStructFixedWidthMiniBlock, PageLayout }; use crate::encodings::physical::block_compress::CompressionConfig; @@ -172,6 +166,20 @@ impl ProtobufUtils { } } + pub fn packed_struct_fixed_width_mini_block( + data: ArrayEncoding, + bits_per_values: Vec, + ) -> ArrayEncoding { + ArrayEncoding { + array_encoding: Some(ArrayEncodingEnum::PackedStructFixedWidthMiniBlock( + Box::new(PackedStructFixedWidthMiniBlock { + flat: Some(Box::new(data)), + bits_per_values, + }), + )), + } + } + pub fn binary( indices_encoding: ArrayEncoding, bytes_encoding: ArrayEncoding, diff --git a/rust/lance-encoding/src/statistics.rs b/rust/lance-encoding/src/statistics.rs index cbd63177c07..d07fe574c8f 100644 --- a/rust/lance-encoding/src/statistics.rs +++ b/rust/lance-encoding/src/statistics.rs @@ -7,7 +7,7 @@ use std::{ sync::Arc, }; -use arrow::array::AsArray; +use arrow::{array::AsArray, datatypes::UInt64Type}; use arrow_array::{Array, ArrowPrimitiveType, UInt64Array}; use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; use num_traits::PrimInt; @@ -61,7 +61,7 @@ impl ComputeStat for DataBlock { Self::FixedSizeList(_) => {} Self::VariableWidth(data_block) => data_block.compute_stat(), Self::Opaque(data_block) => data_block.compute_stat(), - Self::Struct(_) => {} + Self::Struct(data_block) => data_block.compute_stat(), Self::Dictionary(_) => {} } } @@ -371,8 +371,30 @@ impl GetStat for DictionaryDataBlock { } impl GetStat for StructDataBlock { - fn get_stat(&self, _stat: Stat) -> Option> { - None + fn get_stat(&self, stat: Stat) -> Option> { + let block_info = self.block_info.0.read().unwrap(); + if block_info.is_empty() { + panic!("get_stat should be called after statistics are computed.") + } + block_info.get(&stat).cloned() + } +} + +impl ComputeStat for StructDataBlock { + fn compute_stat(&mut self) { + let data_size = self.data_size(); + let data_size_array = Arc::new(UInt64Array::from(vec![data_size])); + + let max_len = self + .children + .iter() + .map(|child| child.expect_single_stat::(Stat::MaxLength)) + .sum::(); + let max_len_array = Arc::new(UInt64Array::from(vec![max_len])); + + let mut info = self.block_info.0.write().unwrap(); + info.insert(Stat::DataSize, data_size_array); + info.insert(Stat::MaxLength, max_len_array); } } diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 172bc6f41cc..da3db828bee 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -218,23 +218,35 @@ impl ReaderProjection { /// /// If the schema provided is not the schema of the entire file then /// the projection will be invalid and the read will fail. + /// If the field is a `struct datatype` with `packed` set to true in the field metadata, + /// the whole struct has one column index. + /// To support nested `packed-struct encoding`, this method need to be further adjusted. pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self { let schema = Arc::new(schema.clone()); let is_structural = version >= LanceFileVersion::V2_1; - let mut counter = 0; - let counter = &mut counter; - let column_indices = schema - .fields_pre_order() - .filter_map(|field| { + let mut column_indices = vec![]; + let mut curr_column_idx = 0; + let mut packed_struct_fields_num = 0; + for field in schema.fields_pre_order() { + let field_metadata = &field.metadata; + if packed_struct_fields_num > 0 { + packed_struct_fields_num -= 1; + continue; + } + if field_metadata + .get("packed") + .map(|v| v == "true") + .unwrap_or(false) { + column_indices.push(curr_column_idx); + curr_column_idx += 1; + packed_struct_fields_num = field.children.len(); + } else { if field.children.is_empty() || !is_structural { - let col_idx = *counter; - *counter += 1; - Some(col_idx) - } else { - None + column_indices.push(curr_column_idx); + curr_column_idx += 1; } - }) - .collect::>(); + } + } Self { schema, column_indices, From 409a465d871a6a4557e0bd02017e7bcf3b0a9588 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 4 Dec 2024 16:13:52 -0500 Subject: [PATCH 02/15] correct `estimated_bytes` computation --- rust/lance-encoding/src/data.rs | 30 +++++++++----- rust/lance-encoding/src/decoder.rs | 12 ++++-- rust/lance-encoding/src/encoder.rs | 11 ++++-- .../src/encodings/logical/struct.rs | 6 ++- .../src/encodings/physical/packed_struct.rs | 30 ++++++++++---- .../src/encodings/physical/struct_encoding.rs | 7 +--- rust/lance-encoding/src/format.rs | 8 +++- rust/lance-encoding/src/statistics.rs | 39 ++++++++++++++----- rust/lance-file/src/v2/reader.rs | 17 ++++---- 9 files changed, 112 insertions(+), 48 deletions(-) diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index d85ec3b9865..be6069a7f50 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -348,13 +348,17 @@ struct StructDataBlockBuilder { impl StructDataBlockBuilder { fn new(bits_per_values: Vec, estimated_size_bytes: u64) -> Self { let mut children = vec![]; + let total_bits: u32 = bits_per_values.iter().sum(); + let total_bits = total_bits as u64; + for bits_per_value in bits_per_values.iter() { - let child = FixedWidthDataBlockBuilder::new(*bits_per_value as u64, estimated_size_bytes); + let child = FixedWidthDataBlockBuilder::new( + *bits_per_value as u64, + estimated_size_bytes * (*bits_per_value as u64 + total_bits - 1) / total_bits, + ); children.push(Box::new(child) as Box); } - Self { - children, - } + Self { children } } } @@ -654,7 +658,7 @@ impl StructDataBlock { .into_iter() .map(|c| c.remove_validity()) .collect(), - block_info: self.block_info.clone(), + block_info: self.block_info, } } @@ -688,8 +692,10 @@ impl StructDataBlock { } pub fn data_size(&self) -> u64 { - self.children.iter() - .map(|data_block| data_block.data_size()).sum() + self.children + .iter() + .map(|data_block| data_block.data_size()) + .sum() } } @@ -950,7 +956,10 @@ impl DataBlock { expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported."); bits_per_values.push(child.bits_per_value as u32); } - Box::new(StructDataBlockBuilder::new(bits_per_values, estimated_size_bytes)) + Box::new(StructDataBlockBuilder::new( + bits_per_values, + estimated_size_bytes, + )) } _ => todo!(), } @@ -1411,7 +1420,10 @@ impl DataBlock { .collect::>(); children.push(Self::from_arrays(&child_vec, num_values)); } - Self::Struct(StructDataBlock { children, block_info: BlockInfo::default() }) + Self::Struct(StructDataBlock { + children, + block_info: BlockInfo::default(), + }) } DataType::FixedSizeList(_, dim) => { let children = arrays diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 96326d6f6e2..4768c4673a3 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -250,8 +250,8 @@ use crate::encodings::physical::binary::{BinaryBlockDecompressor, BinaryMiniBloc use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor; use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor; use crate::encodings::physical::fsst::FsstMiniBlockDecompressor; -use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor}; use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor; +use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor}; use crate::encodings::physical::{ColumnBuffers, FileBuffers}; use crate::format::pb::{self, column_encoding}; use crate::repdef::{LevelBuffer, RepDefUnraveler}; @@ -512,7 +512,9 @@ impl DecompressorStrategy for CoreDecompressorStrategy { Ok(Box::new(FsstMiniBlockDecompressor::new(description))) } pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => { - Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(&description))) + Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new( + description, + ))) } _ => todo!(), } @@ -760,7 +762,11 @@ impl CoreFieldDecoderStrategy { match &data_type { DataType::Struct(fields) => { let field_metadata = &field.metadata; - if field_metadata.get("packed").map(|v| v == "true").unwrap_or(false) { + if field_metadata + .get("packed") + .map(|v| v == "true") + .unwrap_or(false) + { let column_info = column_infos.expect_next()?; let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new( column_info.as_ref(), diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 971bb8fd93d..5203bd3f7ce 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -824,10 +824,13 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { } } if let DataBlock::Struct(ref struct_data_block) = data { - if struct_data_block.children.iter() - .any(|child|!matches!(child, DataBlock::FixedWidth(_))) { - panic!("packed struct encoding currenlty only supports fixed-width fields.") - } + if struct_data_block + .children + .iter() + .any(|child| !matches!(child, DataBlock::FixedWidth(_))) + { + panic!("packed struct encoding currently only supports fixed-width fields.") + } return Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default())); } Ok(Box::new(ValueEncoder::default())) diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 4ec2720ff43..62a2898c409 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -588,7 +588,11 @@ pub struct StructuralStructDecoder { impl StructuralStructDecoder { pub fn new(fields: Fields, should_validate: bool) -> Self { let field0_metadata = fields[0].metadata(); - if field0_metadata.get("packed").map(|v| v == "true").unwrap_or(false) { + if field0_metadata + .get("packed") + .map(|v| v == "true") + .unwrap_or(false) + { let data_type = DataType::Struct(fields.clone()); let child = StructuralPrimitiveFieldDecoder::new(&fields[0], should_validate); Self { diff --git a/rust/lance-encoding/src/encodings/physical/packed_struct.rs b/rust/lance-encoding/src/encodings/physical/packed_struct.rs index 73b2baa4c58..a513b5316b5 100644 --- a/rust/lance-encoding/src/encodings/physical/packed_struct.rs +++ b/rust/lance-encoding/src/encodings/physical/packed_struct.rs @@ -151,7 +151,10 @@ impl PrimitivePageDecoder for PackedStructPageDecoder { let child_block = FixedSizeListBlock::from_flat(child_block, field.data_type()); children.push(child_block); } - Ok(DataBlock::Struct(StructDataBlock { children, block_info: BlockInfo::default() })) + Ok(DataBlock::Struct(StructDataBlock { + children, + block_info: BlockInfo::default(), + })) } } @@ -266,9 +269,13 @@ pub mod tests { testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases}, version::LanceFileVersion, }; + use rstest::rstest; + #[rstest] #[test_log::test(tokio::test)] - async fn test_random_packed_struct() { + async fn test_random_packed_struct( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { let data_type = DataType::Struct(Fields::from(vec![ Field::new("a", DataType::UInt64, false), Field::new("b", DataType::UInt32, false), @@ -278,11 +285,14 @@ pub mod tests { let field = Field::new("", data_type, false).with_metadata(metadata); - check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await; + check_round_trip_encoding_random(field, version).await; } + #[rstest] #[test_log::test(tokio::test)] - async fn test_specific_packed_struct() { + async fn test_specific_packed_struct( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { let array1 = Arc::new(UInt64Array::from(vec![1, 2, 3, 4])); let array2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8])); let array3 = Arc::new(UInt8Array::from(vec![9, 10, 11, 12])); @@ -325,7 +335,8 @@ pub mod tests { .with_range(0..2) .with_range(0..6) .with_range(1..4) - .with_indices(vec![1, 3, 7]); + .with_indices(vec![1, 3, 7]) + .with_file_version(version); let mut metadata = HashMap::new(); metadata.insert("packed".to_string(), "true".to_string()); @@ -338,8 +349,12 @@ pub mod tests { .await; } + // the current Lance V2.1 `packed-struct encoding` doesn't support `fixed size list`. + #[rstest] #[test_log::test(tokio::test)] - async fn test_fsl_packed_struct() { + async fn test_fsl_packed_struct( + #[values(LanceFileVersion::V2_0, /*LanceFileVersion::V2_1)*/)] version: LanceFileVersion, + ) { let int_array = Arc::new(Int32Array::from(vec![12, 13, 14, 15])); let list_data_type = @@ -367,7 +382,8 @@ pub mod tests { .with_range(1..3) .with_range(0..1) .with_range(2..4) - .with_indices(vec![0, 2, 3]); + .with_indices(vec![0, 2, 3]) + .with_file_version(version); let mut metadata = HashMap::new(); metadata.insert("packed".to_string(), "true".to_string()); diff --git a/rust/lance-encoding/src/encodings/physical/struct_encoding.rs b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs index b0115941b51..0d323fc868b 100644 --- a/rust/lance-encoding/src/encodings/physical/struct_encoding.rs +++ b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs @@ -8,9 +8,7 @@ use snafu::{location, Location}; use crate::{ buffer::LanceBuffer, - data::{ - BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock, - }, + data::{BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock}, decoder::MiniBlockDecompressor, encoder::{MiniBlockCompressed, MiniBlockCompressor}, format::{ @@ -69,7 +67,6 @@ impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder { let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box; let (value_miniblock_compressed, value_array_encoding) = value_miniblock_compressor.compress(data_block)?; - println!("MiniBlockCompressed.num_values: {}", value_miniblock_compressed.num_values); Ok(( value_miniblock_compressed, ProtobufUtils::packed_struct_fixed_width_mini_block(value_array_encoding, bits_per_values), @@ -160,4 +157,4 @@ impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor { block_info: BlockInfo::default(), })) } -} \ No newline at end of file +} diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 96ccac3bc23..4005c02e250 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -15,7 +15,13 @@ pub mod pb { } use pb::{ - array_encoding::ArrayEncoding as ArrayEncodingEnum, buffer::BufferType, nullable::{AllNull, NoNull, Nullability, SomeNull}, page_layout::Layout, AllNullLayout, ArrayEncoding, Binary, BinaryBlock, BinaryMiniBlock, Bitpack2, Bitpacked, BitpackedForNonNeg, Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, FsstMiniBlock, MiniBlockLayout, Nullable, PackedStruct, PackedStructFixedWidthMiniBlock, PageLayout + array_encoding::ArrayEncoding as ArrayEncodingEnum, + buffer::BufferType, + nullable::{AllNull, NoNull, Nullability, SomeNull}, + page_layout::Layout, + AllNullLayout, ArrayEncoding, Binary, BinaryBlock, BinaryMiniBlock, Bitpack2, Bitpacked, + BitpackedForNonNeg, Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, FsstMiniBlock, + MiniBlockLayout, Nullable, PackedStruct, PackedStructFixedWidthMiniBlock, PageLayout, }; use crate::encodings::physical::block_compress::CompressionConfig; diff --git a/rust/lance-encoding/src/statistics.rs b/rust/lance-encoding/src/statistics.rs index d07fe574c8f..99b07a0aafc 100644 --- a/rust/lance-encoding/src/statistics.rs +++ b/rust/lance-encoding/src/statistics.rs @@ -13,8 +13,8 @@ use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; use num_traits::PrimInt; use crate::data::{ - AllNullDataBlock, DataBlock, DictionaryDataBlock, FixedWidthDataBlock, NullableDataBlock, - OpaqueBlock, StructDataBlock, VariableWidthBlock, + AllNullDataBlock, DataBlock, DictionaryDataBlock, FixedSizeListBlock, FixedWidthDataBlock, + NullableDataBlock, OpaqueBlock, StructDataBlock, VariableWidthBlock, }; #[derive(Clone, Copy, PartialEq, Eq, Hash)] @@ -156,7 +156,7 @@ impl GetStat for DataBlock { Self::AllNull(data_block) => data_block.get_stat(stat), Self::Nullable(data_block) => data_block.get_stat(stat), Self::FixedWidth(data_block) => data_block.get_stat(stat), - Self::FixedSizeList(_) => None, + Self::FixedSizeList(data_block) => data_block.get_stat(stat), Self::VariableWidth(data_block) => data_block.get_stat(stat), Self::Opaque(data_block) => data_block.get_stat(stat), Self::Struct(data_block) => data_block.get_stat(stat), @@ -165,6 +165,19 @@ impl GetStat for DataBlock { } } +impl GetStat for FixedSizeListBlock { + fn get_stat(&self, stat: Stat) -> Option> { + match stat { + Stat::MaxLength => { + let max_len = self.dimension * self.child.expect_single_stat::(stat); + Some(Arc::new(UInt64Array::from(vec![max_len]))) + } + Stat::DataSize => self.child.get_stat(stat), + _ => None, + } + } +} + // NullableDataBlock will be deprecated in Lance 2.1. impl GetStat for NullableDataBlock { // This function simply returns the statistics of the inner `DataBlock` of `NullableDataBlock`, @@ -416,6 +429,7 @@ mod tests { use super::DataBlock; use arrow::{ + array::AsArray, compute::concat, datatypes::{Int32Type, UInt64Type}, }; @@ -464,18 +478,25 @@ mod tests { let fields = vec![ Arc::new(Field::new("int_field", DataType::Int32, false)), Arc::new(Field::new("float_field", DataType::Float32, false)), - Arc::new(Field::new( - "fsl_field", - DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 5), - false, - )), ] .into(); let mut gen = lance_datagen::array::rand_type(&DataType::Struct(fields)); let arr = gen.generate(RowCount::from(3), &mut rng).unwrap(); let block = DataBlock::from_array(arr.clone()); - assert!(block.get_stat(Stat::DataSize).is_none()); + let (_, arr_parts, _) = arr.as_struct().clone().into_parts(); + let total_buffer_size: usize = arr_parts + .iter() + .map(|arr| { + arr.to_data() + .buffers() + .iter() + .map(|buffer| buffer.len()) + .sum::() + }) + .sum(); + let data_size = block.expect_single_stat::(Stat::DataSize); + assert!(data_size == total_buffer_size as u64); // test DataType::Dictionary let mut gen = array::rand_type(&DataType::Dictionary( diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index da3db828bee..e3d4bea42d1 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -236,15 +236,14 @@ impl ReaderProjection { if field_metadata .get("packed") .map(|v| v == "true") - .unwrap_or(false) { - column_indices.push(curr_column_idx); - curr_column_idx += 1; - packed_struct_fields_num = field.children.len(); - } else { - if field.children.is_empty() || !is_structural { - column_indices.push(curr_column_idx); - curr_column_idx += 1; - } + .unwrap_or(false) + { + column_indices.push(curr_column_idx); + curr_column_idx += 1; + packed_struct_fields_num = field.children.len(); + } else if field.children.is_empty() || !is_structural { + column_indices.push(curr_column_idx); + curr_column_idx += 1; } } Self { From cd8343e84e69d6e72d4a3f07359eee8b2bf8c55f Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 5 Dec 2024 10:01:46 -0500 Subject: [PATCH 03/15] a workable version for schema with more than one columns --- .../src/encodings/logical/struct.rs | 46 +++++++++---------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 62a2898c409..8f76f8ad269 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -587,30 +587,15 @@ pub struct StructuralStructDecoder { impl StructuralStructDecoder { pub fn new(fields: Fields, should_validate: bool) -> Self { - let field0_metadata = fields[0].metadata(); - if field0_metadata - .get("packed") - .map(|v| v == "true") - .unwrap_or(false) - { - let data_type = DataType::Struct(fields.clone()); - let child = StructuralPrimitiveFieldDecoder::new(&fields[0], should_validate); - Self { - data_type, - children: vec![Box::new(child)], - child_fields: fields, - } - } else { - let children = fields - .iter() - .map(|field| Self::field_to_decoder(field, should_validate)) - .collect(); - let data_type = DataType::Struct(fields.clone()); - Self { - data_type, - children, - child_fields: fields, - } + let children = fields + .iter() + .map(|field| Self::field_to_decoder(field, should_validate)) + .collect(); + let data_type = DataType::Struct(fields.clone()); + Self { + data_type, + children, + child_fields: fields, } } @@ -619,7 +604,18 @@ impl StructuralStructDecoder { should_validate: bool, ) -> Box { match field.data_type() { - DataType::Struct(fields) => Box::new(Self::new(fields.clone(), should_validate)), + DataType::Struct(fields) => { + let field_metadata = field.metadata(); + if field_metadata + .get("packed") + .map(|v| v == "true") + .unwrap_or(false) { + let decoder = StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate); + Box::new(decoder) + } else { + Box::new(Self::new(fields.clone(), should_validate)) + } + } DataType::List(_) | DataType::LargeList(_) => todo!(), DataType::RunEndEncoded(_, _) => todo!(), DataType::ListView(_) | DataType::LargeListView(_) => todo!(), From 90f82e7717ec7dbbedd59f9dea7519ad799c8343 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 5 Dec 2024 10:21:49 -0500 Subject: [PATCH 04/15] remove fixed-size-list field in benchmark script --- python/python/benchmarks/test_packed_struct.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/python/python/benchmarks/test_packed_struct.py b/python/python/benchmarks/test_packed_struct.py index 037470f01ce..84ea57860ed 100644 --- a/python/python/benchmarks/test_packed_struct.py +++ b/python/python/benchmarks/test_packed_struct.py @@ -33,13 +33,10 @@ def test_data(tmp_path_factory): [ pc.random(NUM_ROWS).cast(pa.float32()), pa.array(range(NUM_ROWS), type=pa.int32()), - pa.FixedSizeListArray.from_arrays( - pc.random(NUM_ROWS * 5).cast(pa.float32()), 5 - ), pa.array(range(NUM_ROWS), type=pa.int32()), pa.array(range(NUM_ROWS), type=pa.int32()), ], - ["f", "i", "fsl", "i2", "i3"], + ["f", "i", "i2", "i3"], ) } ) @@ -51,6 +48,7 @@ def test_data(tmp_path_factory): @pytest.fixture(scope="module") def random_indices(): random_indices = [random.randint(0, NUM_ROWS) for _ in range(NUM_INDICES)] + random_indices.sort() return random_indices @@ -127,7 +125,7 @@ def test_parquet_write(tmp_path: Path, benchmark, test_data): def write_lance_file(lance_path, test_data): - with LanceFileWriter(lance_path, test_data.schema) as writer: + with LanceFileWriter(lance_path, test_data.schema, version = "2.1") as writer: for batch in test_data.to_batches(): writer.write_batch(batch) From 5bfc8c81787f00e2bbb413f52cbf8161c1bfa6be Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 5 Dec 2024 11:07:04 -0500 Subject: [PATCH 05/15] remove the added statistics gathering for FixedSizeListBlock --- rust/lance-encoding/src/encodings/logical/struct.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 8f76f8ad269..4654949443a 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -609,9 +609,11 @@ impl StructuralStructDecoder { if field_metadata .get("packed") .map(|v| v == "true") - .unwrap_or(false) { - let decoder = StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate); - Box::new(decoder) + .unwrap_or(false) + { + let decoder = + StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate); + Box::new(decoder) } else { Box::new(Self::new(fields.clone(), should_validate)) } From ba4c6d689e810a814e577fe7cdcc8a31ca47e7a5 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 5 Dec 2024 11:43:49 -0500 Subject: [PATCH 06/15] fix merge error --- rust/lance-encoding/src/data.rs | 1 + rust/lance-encoding/src/encodings/logical/struct.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index 2f4be62e2dc..26e5dcd3e0c 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -343,6 +343,7 @@ impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder { } } +#[derive(Debug)] struct StructDataBlockBuilder { children: Vec>, } diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 3c320e42a5b..88cec04d327 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -618,7 +618,7 @@ impl StructuralStructDecoder { StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate); Box::new(decoder) } else { - Box::new(Self::new(fields.clone(), should_validate)) + Box::new(Self::new(fields.clone(), should_validate, false)) } } DataType::List(_) | DataType::LargeList(_) => todo!(), From ce89ff1240418bf4a6d13a8814527abfef222c19 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 5 Dec 2024 12:56:25 -0500 Subject: [PATCH 07/15] fmt --- rust/lance-encoding/src/format.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 9ad9b7d3f6f..2ac07f22d44 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -21,7 +21,8 @@ use pb::{ page_layout::Layout, AllNullLayout, ArrayEncoding, Binary, BinaryBlock, BinaryMiniBlock, Bitpack2, Bitpacked, BitpackedForNonNeg, Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, FsstMiniBlock, - MiniBlockLayout, Nullable, PackedStruct, PackedStructFixedWidthMiniBlock, PageLayout, RepDefLayer, + MiniBlockLayout, Nullable, PackedStruct, PackedStructFixedWidthMiniBlock, PageLayout, + RepDefLayer, }; use crate::{ From 9a5a13f4ebf6b83ed35bb3322372407f0d4ccfbc Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 5 Dec 2024 13:17:28 -0500 Subject: [PATCH 08/15] fmt --- python/python/benchmarks/test_packed_struct.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/python/benchmarks/test_packed_struct.py b/python/python/benchmarks/test_packed_struct.py index 84ea57860ed..5d5cd8c3cac 100644 --- a/python/python/benchmarks/test_packed_struct.py +++ b/python/python/benchmarks/test_packed_struct.py @@ -13,7 +13,7 @@ trace_to_chrome(level="debug", file="/tmp/trace.json") NUM_ROWS = 10_000_000 -RANDOM_ACCESS = "indices" +RANDOM_ACCESS = "full" NUM_INDICES = 100 NUM_ROUNDS = 10 @@ -125,7 +125,7 @@ def test_parquet_write(tmp_path: Path, benchmark, test_data): def write_lance_file(lance_path, test_data): - with LanceFileWriter(lance_path, test_data.schema, version = "2.1") as writer: + with LanceFileWriter(lance_path, test_data.schema, version="2.1") as writer: for batch in test_data.to_batches(): writer.write_batch(batch) From c38ed701689a05d4eca060cc68899da8c1a27ef1 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Fri, 6 Dec 2024 18:07:36 -0500 Subject: [PATCH 09/15] fix bug with `create_structural_field_scheduler` --- rust/lance-encoding/src/decoder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index fd70067601e..dec88cec811 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -772,6 +772,7 @@ impl CoreFieldDecoderStrategy { column_info.as_ref(), self.decompressor_strategy.as_ref(), )?); + column_infos.next_top_level(); return Ok(scheduler); } let mut child_schedulers = Vec::with_capacity(field.children.len()); From c28b3cf2cb1350a0d9aadcf440b6726fd3f8e5ab Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Mon, 9 Dec 2024 09:15:03 -0500 Subject: [PATCH 10/15] correct `estimate size` in StructDataBlockBuilder. --- python/python/benchmarks/test_packed_struct.py | 2 +- rust/lance-encoding/src/data.rs | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/python/python/benchmarks/test_packed_struct.py b/python/python/benchmarks/test_packed_struct.py index 5d5cd8c3cac..77638d9575a 100644 --- a/python/python/benchmarks/test_packed_struct.py +++ b/python/python/benchmarks/test_packed_struct.py @@ -13,7 +13,7 @@ trace_to_chrome(level="debug", file="/tmp/trace.json") NUM_ROWS = 10_000_000 -RANDOM_ACCESS = "full" +RANDOM_ACCESS = "indices" NUM_INDICES = 100 NUM_ROUNDS = 10 diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index 26e5dcd3e0c..e2fee0e175f 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -349,15 +349,19 @@ struct StructDataBlockBuilder { } impl StructDataBlockBuilder { + // Currently only Struct with fixed-width fields are supported. + // And the assumption that all fields have `bits_per_value % 8 == 0` is made here. fn new(bits_per_values: Vec, estimated_size_bytes: u64) -> Self { let mut children = vec![]; - let total_bits: u32 = bits_per_values.iter().sum(); - let total_bits = total_bits as u64; + + let bytes_per_row: u32 = bits_per_values.iter().sum::() / 8; + let bytes_per_row = bytes_per_row as u64; for bits_per_value in bits_per_values.iter() { + let this_estimated_size_bytes = estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8; let child = FixedWidthDataBlockBuilder::new( *bits_per_value as u64, - estimated_size_bytes * (*bits_per_value as u64 + total_bits - 1) / total_bits, + this_estimated_size_bytes, ); children.push(Box::new(child) as Box); } From 40e29c9da58a074314746887818685f5dde2bf7c Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Mon, 9 Dec 2024 09:32:32 -0500 Subject: [PATCH 11/15] add check in `PrimitiveStructuralEncoder::do_flush` to make sure fields of struct is fixed-width data block when using packed-struct encoding. --- rust/lance-encoding/src/data.rs | 11 +++++------ rust/lance-encoding/src/decoder.rs | 6 ++++++ rust/lance-encoding/src/encoder.rs | 2 ++ .../src/encodings/logical/primitive.rs | 14 +++++++++++++- 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index e2fee0e175f..bbdb477afe7 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -350,7 +350,7 @@ struct StructDataBlockBuilder { impl StructDataBlockBuilder { // Currently only Struct with fixed-width fields are supported. - // And the assumption that all fields have `bits_per_value % 8 == 0` is made here. + // And the assumption that all fields have `bits_per_value % 8 == 0` is made here. fn new(bits_per_values: Vec, estimated_size_bytes: u64) -> Self { let mut children = vec![]; @@ -358,11 +358,10 @@ impl StructDataBlockBuilder { let bytes_per_row = bytes_per_row as u64; for bits_per_value in bits_per_values.iter() { - let this_estimated_size_bytes = estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8; - let child = FixedWidthDataBlockBuilder::new( - *bits_per_value as u64, - this_estimated_size_bytes, - ); + let this_estimated_size_bytes = + estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8; + let child = + FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes); children.push(Box::new(child) as Box); } Self { children } diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index dec88cec811..b4dffc81a61 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -756,7 +756,10 @@ impl CoreFieldDecoderStrategy { column_info.as_ref(), self.decompressor_strategy.as_ref(), )?); + + // advance to the next top level column column_infos.next_top_level(); + return Ok(scheduler); } match &data_type { @@ -772,7 +775,10 @@ impl CoreFieldDecoderStrategy { column_info.as_ref(), self.decompressor_strategy.as_ref(), )?); + + // advance to the next top level column column_infos.next_top_level(); + return Ok(scheduler); } let mut child_schedulers = Vec::with_capacity(field.children.len()); diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 5203bd3f7ce..ba69944119e 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -824,6 +824,8 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { } } if let DataBlock::Struct(ref struct_data_block) = data { + // this condition is actually checked at `PrimitiveStructualEncoder::do_flush`, + // just being cautious here. if struct_data_block .children .iter() diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index a91e9b5bf79..5621e8ca71d 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -21,7 +21,7 @@ use lance_core::utils::hash::U8SliceKey; use log::{debug, trace}; use snafu::{location, Location}; -use crate::data::{AllNullDataBlock, DataBlock, VariableWidthBlock}; +use crate::data::{AllNullDataBlock, DataBlock, StructDataBlock, VariableWidthBlock}; use crate::decoder::PerValueDecompressor; use crate::encoder::PerValueDataBlock; use crate::repdef::{ @@ -2466,6 +2466,18 @@ impl PrimitiveStructuralEncoder { Self::encode_simple_all_null(column_idx, num_values, row_number) } else { let data_block = DataBlock::from_arrays(&arrays, num_values); + + // if the `data_block` is a `StructDataBlock`, then this is a struct with packed struct encoding. + if let DataBlock::Struct(ref struct_data_block) = data_block { + if struct_data_block + .children + .iter() + .any(|child| !matches!(child, DataBlock::FixedWidth(_))) + { + panic!("packed struct encoding currently only supports fixed-width fields.") + } + } + const DICTIONARY_ENCODING_THRESHOLD: u64 = 100; let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) { From f54c87371bda2274bf0a45bd4e2938780ceab086 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Mon, 9 Dec 2024 15:20:48 -0500 Subject: [PATCH 12/15] use random float number in test_packed_struct.py --- .../python/benchmarks/test_packed_struct.py | 22 +++++++++++++------ rust/lance-encoding/src/encoder.rs | 2 +- .../src/encodings/physical/struct_encoding.rs | 3 +++ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/python/python/benchmarks/test_packed_struct.py b/python/python/benchmarks/test_packed_struct.py index 77638d9575a..5131c440271 100644 --- a/python/python/benchmarks/test_packed_struct.py +++ b/python/python/benchmarks/test_packed_struct.py @@ -14,8 +14,10 @@ NUM_ROWS = 10_000_000 RANDOM_ACCESS = "indices" -NUM_INDICES = 100 +NUM_INDICES = 1000 NUM_ROUNDS = 10 +#BATCH_SIZE = 16 * 1024 +BATCH_SIZE = 1000 # This file compares benchmarks for reading and writing a StructArray column using # (i) parquet @@ -31,10 +33,10 @@ def test_data(tmp_path_factory): { "struct_col": pa.StructArray.from_arrays( [ - pc.random(NUM_ROWS).cast(pa.float32()), - pa.array(range(NUM_ROWS), type=pa.int32()), - pa.array(range(NUM_ROWS), type=pa.int32()), - pa.array(range(NUM_ROWS), type=pa.int32()), + pc.random(NUM_ROWS).cast(pa.float32()), # f + pc.random(NUM_ROWS).cast(pa.float32()), # i + pc.random(NUM_ROWS).cast(pa.float32()), # i2 + pc.random(NUM_ROWS).cast(pa.float32()), # i3 ], ["f", "i", "i2", "i3"], ) @@ -57,12 +59,18 @@ def test_parquet_read(tmp_path: Path, benchmark, test_data, random_indices): parquet_path = tmp_path / "data.parquet" pq.write_table(test_data, parquet_path) + def read_parquet(): + parquet_file = pq.ParquetFile(parquet_path) + batches = parquet_file.iter_batches(batch_size=BATCH_SIZE) + tab_parquet = pa.Table.from_batches(batches) + return tab_parquet + if RANDOM_ACCESS == "indices": benchmark.pedantic( lambda: pq.read_table(parquet_path).take(random_indices), rounds=5 ) elif RANDOM_ACCESS == "full": - benchmark.pedantic(lambda: pq.read_table(parquet_path), rounds=5) + benchmark.pedantic(lambda: read_parquet(), rounds=5) def read_lance_file_random(lance_path, random_indices): @@ -73,7 +81,7 @@ def read_lance_file_random(lance_path, random_indices): def read_lance_file_full(lance_path): - for batch in LanceFileReader(lance_path).read_all(batch_size=1000).to_batches(): + for batch in LanceFileReader(lance_path).read_all(batch_size= BATCH_SIZE).to_batches(): pass diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index ba69944119e..85c55b6d1e6 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -824,7 +824,7 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { } } if let DataBlock::Struct(ref struct_data_block) = data { - // this condition is actually checked at `PrimitiveStructualEncoder::do_flush`, + // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`, // just being cautious here. if struct_data_block .children diff --git a/rust/lance-encoding/src/encodings/physical/struct_encoding.rs b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs index 0d323fc868b..8a8aa2252c5 100644 --- a/rust/lance-encoding/src/encodings/physical/struct_encoding.rs +++ b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs @@ -20,6 +20,9 @@ use crate::{ use super::value::{ValueDecompressor, ValueEncoder}; +// Transforms a `StructDataBlock` into a row major `FixedWidthDataBlock`. +// Only fields with fixed-width fields are supported for now, and the +// assumption that all fields has `bits_per_value % 8 == 0` is made. fn struct_data_block_to_fixed_width_data_block( struct_data_block: StructDataBlock, bits_per_values: &[u32], From 5f632f58a3d9ce97be2873ac82880e7e97d5d064 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Mon, 9 Dec 2024 15:49:39 -0500 Subject: [PATCH 13/15] fmt --- python/python/benchmarks/test_packed_struct.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/python/benchmarks/test_packed_struct.py b/python/python/benchmarks/test_packed_struct.py index 5131c440271..ab184e13ca0 100644 --- a/python/python/benchmarks/test_packed_struct.py +++ b/python/python/benchmarks/test_packed_struct.py @@ -16,8 +16,7 @@ RANDOM_ACCESS = "indices" NUM_INDICES = 1000 NUM_ROUNDS = 10 -#BATCH_SIZE = 16 * 1024 -BATCH_SIZE = 1000 +BATCH_SIZE = 16 * 1024 # This file compares benchmarks for reading and writing a StructArray column using # (i) parquet @@ -34,11 +33,11 @@ def test_data(tmp_path_factory): "struct_col": pa.StructArray.from_arrays( [ pc.random(NUM_ROWS).cast(pa.float32()), # f - pc.random(NUM_ROWS).cast(pa.float32()), # i - pc.random(NUM_ROWS).cast(pa.float32()), # i2 - pc.random(NUM_ROWS).cast(pa.float32()), # i3 + pc.random(NUM_ROWS).cast(pa.float32()), # i + pc.random(NUM_ROWS).cast(pa.float32()), # i2 + pc.random(NUM_ROWS).cast(pa.float32()), # i3 ], - ["f", "i", "i2", "i3"], + ["f1", "f2", "f3", "f4"], ) } ) @@ -81,7 +80,9 @@ def read_lance_file_random(lance_path, random_indices): def read_lance_file_full(lance_path): - for batch in LanceFileReader(lance_path).read_all(batch_size= BATCH_SIZE).to_batches(): + for batch in ( + LanceFileReader(lance_path).read_all(batch_size=BATCH_SIZE).to_batches() + ): pass From bf73273978eb9c41e80adaf9a8ca670f67eceac3 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Tue, 10 Dec 2024 10:48:37 -0500 Subject: [PATCH 14/15] address PR comments --- python/python/benchmarks/test_packed_struct.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/python/benchmarks/test_packed_struct.py b/python/python/benchmarks/test_packed_struct.py index ab184e13ca0..96c887174a1 100644 --- a/python/python/benchmarks/test_packed_struct.py +++ b/python/python/benchmarks/test_packed_struct.py @@ -32,10 +32,10 @@ def test_data(tmp_path_factory): { "struct_col": pa.StructArray.from_arrays( [ - pc.random(NUM_ROWS).cast(pa.float32()), # f - pc.random(NUM_ROWS).cast(pa.float32()), # i - pc.random(NUM_ROWS).cast(pa.float32()), # i2 - pc.random(NUM_ROWS).cast(pa.float32()), # i3 + pc.random(NUM_ROWS).cast(pa.float32()), # f1 + pc.random(NUM_ROWS).cast(pa.float32()), # f2 + pc.random(NUM_ROWS).cast(pa.float32()), # f3 + pc.random(NUM_ROWS).cast(pa.float32()), # f4 ], ["f1", "f2", "f3", "f4"], ) From d4b5a008d4f82d6ac4824bc49d229b56e15cb8c1 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Tue, 10 Dec 2024 11:16:27 -0500 Subject: [PATCH 15/15] address PR comments --- rust/lance-arrow/src/schema.rs | 11 +++++++++++ rust/lance-core/src/datatypes/field.rs | 9 +++++++++ rust/lance-encoding/src/data.rs | 2 ++ rust/lance-encoding/src/decoder.rs | 7 +------ rust/lance-encoding/src/encoder.rs | 7 +------ .../src/encodings/logical/primitive.rs | 2 +- .../src/encodings/logical/struct.rs | 8 ++------ .../src/encodings/physical/struct_encoding.rs | 15 +++++++++++---- rust/lance-encoding/src/statistics.rs | 19 +++---------------- rust/lance-file/src/v2/reader.rs | 7 +------ 10 files changed, 42 insertions(+), 45 deletions(-) diff --git a/rust/lance-arrow/src/schema.rs b/rust/lance-arrow/src/schema.rs index 73f1f969647..aa48ce9352d 100644 --- a/rust/lance-arrow/src/schema.rs +++ b/rust/lance-arrow/src/schema.rs @@ -32,6 +32,8 @@ pub trait FieldExt { /// /// This is intended for display purposes and not for serialization fn to_compact_string(&self, indent: Indentation) -> String; + + fn is_packed_struct(&self) -> bool; } impl FieldExt for Field { @@ -79,6 +81,15 @@ impl FieldExt for Field { } result } + + // Check if field has metadata `packed` set to true, this check is case insensitive. + fn is_packed_struct(&self) -> bool { + let field_metadata = self.metadata(); + field_metadata + .get("packed") + .map(|v| v.to_lowercase() == "true") + .unwrap_or(false) + } } /// Extends the functionality of [arrow_schema::Schema]. diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 91eade2fa7c..c2492d031ef 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -731,6 +731,15 @@ impl Field { } None } + + // Check if field has metadata `packed` set to true, this check is case insensitive. + pub fn is_packed_struct(&self) -> bool { + let field_metadata = &self.metadata; + field_metadata + .get("packed") + .map(|v| v.to_lowercase() == "true") + .unwrap_or(false) + } } impl fmt::Display for Field { diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index bbdb477afe7..a4105b6d8ce 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -354,6 +354,8 @@ impl StructDataBlockBuilder { fn new(bits_per_values: Vec, estimated_size_bytes: u64) -> Self { let mut children = vec![]; + debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0)); + let bytes_per_row: u32 = bits_per_values.iter().sum::() / 8; let bytes_per_row = bytes_per_row as u64; diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index b4dffc81a61..2e47b91b084 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -764,12 +764,7 @@ impl CoreFieldDecoderStrategy { } match &data_type { DataType::Struct(fields) => { - let field_metadata = &field.metadata; - if field_metadata - .get("packed") - .map(|v| v == "true") - .unwrap_or(false) - { + if field.is_packed_struct() { let column_info = column_infos.expect_next()?; let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new( column_info.as_ref(), diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 85c55b6d1e6..f8db823e66a 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -1221,12 +1221,7 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy { todo!() } DataType::Struct(_) => { - let field_metadata = &field.metadata; - if field_metadata - .get("packed") - .map(|v| v == "true") - .unwrap_or(false) - { + if field.is_packed_struct() { Ok(Box::new(PrimitiveStructuralEncoder::try_new( options, self.compression_strategy.clone(), diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 5621e8ca71d..22389fbdad6 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -21,7 +21,7 @@ use lance_core::utils::hash::U8SliceKey; use log::{debug, trace}; use snafu::{location, Location}; -use crate::data::{AllNullDataBlock, DataBlock, StructDataBlock, VariableWidthBlock}; +use crate::data::{AllNullDataBlock, DataBlock, VariableWidthBlock}; use crate::decoder::PerValueDecompressor; use crate::encoder::PerValueDataBlock; use crate::repdef::{ diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 88cec04d327..c1f484f65ba 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -15,6 +15,7 @@ use futures::{ FutureExt, StreamExt, TryStreamExt, }; use itertools::Itertools; +use lance_arrow::FieldExt; use log::trace; use snafu::{location, Location}; @@ -608,12 +609,7 @@ impl StructuralStructDecoder { ) -> Box { match field.data_type() { DataType::Struct(fields) => { - let field_metadata = field.metadata(); - if field_metadata - .get("packed") - .map(|v| v == "true") - .unwrap_or(false) - { + if field.is_packed_struct() { let decoder = StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate); Box::new(decoder) diff --git a/rust/lance-encoding/src/encodings/physical/struct_encoding.rs b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs index 8a8aa2252c5..493356e374f 100644 --- a/rust/lance-encoding/src/encodings/physical/struct_encoding.rs +++ b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs @@ -65,11 +65,15 @@ impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder { match data { DataBlock::Struct(struct_data_block) => { let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value as u32).collect::>(); + + // transform struct datablock to fixed-width data block. let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values); + // store and transformed fixed-width data block. let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box; let (value_miniblock_compressed, value_array_encoding) = value_miniblock_compressor.compress(data_block)?; + Ok(( value_miniblock_compressed, ProtobufUtils::packed_struct_fixed_width_mini_block(value_array_encoding, bits_per_values), @@ -104,7 +108,7 @@ impl PackedStructFixedWidthMiniBlockDecompressor { .unwrap() { pb::array_encoding::ArrayEncoding::Flat(flat) => Box::new(ValueDecompressor::new(flat)), - _ => panic!("Unsupported array encoding"), + _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."), }; Self { bits_per_values: description.bits_per_values.clone(), @@ -127,10 +131,11 @@ impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor { .collect::>(); assert!(encoded_data_block.bits_per_value % 8 == 0); - let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize; - let mut prefix_sum = vec![0; self.bits_per_values.len() + 1]; - for i in 0..self.bits_per_values.len() { + + // use a prefix_sum vector as a helper to reconstruct to `StructDataBlock`. + let mut prefix_sum = vec![0; self.bits_per_values.len()]; + for i in 0..(self.bits_per_values.len() - 1) { prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i]; } @@ -140,10 +145,12 @@ impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor { let mut child_buf: Vec = Vec::with_capacity(child_buf_size); for j in 0..num_values as usize { + // the start of the data at this row is `j * encoded_bytes_per_row`, and the offset for this field is `prefix_sum[i]`, this field has length `bytes_per_values[i]`. let this_value = encoded_data_block.data.slice_with_length( prefix_sum[i] + (j * encoded_bytes_per_row), bytes_per_values[i], ); + child_buf.extend_from_slice(&this_value); } diff --git a/rust/lance-encoding/src/statistics.rs b/rust/lance-encoding/src/statistics.rs index 99b07a0aafc..9596ab2099e 100644 --- a/rust/lance-encoding/src/statistics.rs +++ b/rust/lance-encoding/src/statistics.rs @@ -13,8 +13,8 @@ use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; use num_traits::PrimInt; use crate::data::{ - AllNullDataBlock, DataBlock, DictionaryDataBlock, FixedSizeListBlock, FixedWidthDataBlock, - NullableDataBlock, OpaqueBlock, StructDataBlock, VariableWidthBlock, + AllNullDataBlock, DataBlock, DictionaryDataBlock, FixedWidthDataBlock, NullableDataBlock, + OpaqueBlock, StructDataBlock, VariableWidthBlock, }; #[derive(Clone, Copy, PartialEq, Eq, Hash)] @@ -156,7 +156,7 @@ impl GetStat for DataBlock { Self::AllNull(data_block) => data_block.get_stat(stat), Self::Nullable(data_block) => data_block.get_stat(stat), Self::FixedWidth(data_block) => data_block.get_stat(stat), - Self::FixedSizeList(data_block) => data_block.get_stat(stat), + Self::FixedSizeList(_) => None, Self::VariableWidth(data_block) => data_block.get_stat(stat), Self::Opaque(data_block) => data_block.get_stat(stat), Self::Struct(data_block) => data_block.get_stat(stat), @@ -165,19 +165,6 @@ impl GetStat for DataBlock { } } -impl GetStat for FixedSizeListBlock { - fn get_stat(&self, stat: Stat) -> Option> { - match stat { - Stat::MaxLength => { - let max_len = self.dimension * self.child.expect_single_stat::(stat); - Some(Arc::new(UInt64Array::from(vec![max_len]))) - } - Stat::DataSize => self.child.get_stat(stat), - _ => None, - } - } -} - // NullableDataBlock will be deprecated in Lance 2.1. impl GetStat for NullableDataBlock { // This function simply returns the statistics of the inner `DataBlock` of `NullableDataBlock`, diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index e3d4bea42d1..77888bf2761 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -228,16 +228,11 @@ impl ReaderProjection { let mut curr_column_idx = 0; let mut packed_struct_fields_num = 0; for field in schema.fields_pre_order() { - let field_metadata = &field.metadata; if packed_struct_fields_num > 0 { packed_struct_fields_num -= 1; continue; } - if field_metadata - .get("packed") - .map(|v| v == "true") - .unwrap_or(false) - { + if field.is_packed_struct() { column_indices.push(curr_column_idx); curr_column_idx += 1; packed_struct_fields_num = field.children.len();