diff --git a/protos/encodings.proto b/protos/encodings.proto index a80f7ccd37e..1e98e7cb88d 100644 --- a/protos/encodings.proto +++ b/protos/encodings.proto @@ -258,6 +258,11 @@ message PackedStruct { Buffer buffer = 2; } +message PackedStructFixedWidthMiniBlock { + ArrayEncoding Flat = 1; + repeated uint32 bits_per_values = 2; +} + message FixedSizeBinary { ArrayEncoding bytes = 1; uint32 byte_width = 2; @@ -283,6 +288,7 @@ message ArrayEncoding { BinaryMiniBlock binary_mini_block = 15; FsstMiniBlock fsst_mini_block = 16; BinaryBlock binary_block = 17; + PackedStructFixedWidthMiniBlock packed_struct_fixed_width_mini_block = 18; } } diff --git a/python/python/benchmarks/test_packed_struct.py b/python/python/benchmarks/test_packed_struct.py index 037470f01ce..96c887174a1 100644 --- a/python/python/benchmarks/test_packed_struct.py +++ b/python/python/benchmarks/test_packed_struct.py @@ -14,8 +14,9 @@ NUM_ROWS = 10_000_000 RANDOM_ACCESS = "indices" -NUM_INDICES = 100 +NUM_INDICES = 1000 NUM_ROUNDS = 10 +BATCH_SIZE = 16 * 1024 # This file compares benchmarks for reading and writing a StructArray column using # (i) parquet @@ -31,15 +32,12 @@ def test_data(tmp_path_factory): { "struct_col": pa.StructArray.from_arrays( [ - pc.random(NUM_ROWS).cast(pa.float32()), - pa.array(range(NUM_ROWS), type=pa.int32()), - pa.FixedSizeListArray.from_arrays( - pc.random(NUM_ROWS * 5).cast(pa.float32()), 5 - ), - pa.array(range(NUM_ROWS), type=pa.int32()), - pa.array(range(NUM_ROWS), type=pa.int32()), + pc.random(NUM_ROWS).cast(pa.float32()), # f1 + pc.random(NUM_ROWS).cast(pa.float32()), # f2 + pc.random(NUM_ROWS).cast(pa.float32()), # f3 + pc.random(NUM_ROWS).cast(pa.float32()), # f4 ], - ["f", "i", "fsl", "i2", "i3"], + ["f1", "f2", "f3", "f4"], ) } ) @@ -51,6 +49,7 @@ def test_data(tmp_path_factory): @pytest.fixture(scope="module") def random_indices(): random_indices = [random.randint(0, NUM_ROWS) for _ in range(NUM_INDICES)] + random_indices.sort() return random_indices @@ -59,12 +58,18 @@ def test_parquet_read(tmp_path: Path, benchmark, test_data, random_indices): parquet_path = tmp_path / "data.parquet" pq.write_table(test_data, parquet_path) + def read_parquet(): + parquet_file = pq.ParquetFile(parquet_path) + batches = parquet_file.iter_batches(batch_size=BATCH_SIZE) + tab_parquet = pa.Table.from_batches(batches) + return tab_parquet + if RANDOM_ACCESS == "indices": benchmark.pedantic( lambda: pq.read_table(parquet_path).take(random_indices), rounds=5 ) elif RANDOM_ACCESS == "full": - benchmark.pedantic(lambda: pq.read_table(parquet_path), rounds=5) + benchmark.pedantic(lambda: read_parquet(), rounds=5) def read_lance_file_random(lance_path, random_indices): @@ -75,7 +80,9 @@ def read_lance_file_random(lance_path, random_indices): def read_lance_file_full(lance_path): - for batch in LanceFileReader(lance_path).read_all(batch_size=1000).to_batches(): + for batch in ( + LanceFileReader(lance_path).read_all(batch_size=BATCH_SIZE).to_batches() + ): pass @@ -127,7 +134,7 @@ def test_parquet_write(tmp_path: Path, benchmark, test_data): def write_lance_file(lance_path, test_data): - with LanceFileWriter(lance_path, test_data.schema) as writer: + with LanceFileWriter(lance_path, test_data.schema, version="2.1") as writer: for batch in test_data.to_batches(): writer.write_batch(batch) diff --git a/rust/lance-arrow/src/schema.rs b/rust/lance-arrow/src/schema.rs index 73f1f969647..aa48ce9352d 100644 --- a/rust/lance-arrow/src/schema.rs +++ b/rust/lance-arrow/src/schema.rs @@ -32,6 +32,8 @@ pub trait FieldExt { /// /// This is intended for display purposes and not for serialization fn to_compact_string(&self, indent: Indentation) -> String; + + fn is_packed_struct(&self) -> bool; } impl FieldExt for Field { @@ -79,6 +81,15 @@ impl FieldExt for Field { } result } + + // Check if field has metadata `packed` set to true, this check is case insensitive. + fn is_packed_struct(&self) -> bool { + let field_metadata = self.metadata(); + field_metadata + .get("packed") + .map(|v| v.to_lowercase() == "true") + .unwrap_or(false) + } } /// Extends the functionality of [arrow_schema::Schema]. diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 91eade2fa7c..c2492d031ef 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -731,6 +731,15 @@ impl Field { } None } + + // Check if field has metadata `packed` set to true, this check is case insensitive. + pub fn is_packed_struct(&self) -> bool { + let field_metadata = &self.metadata; + field_metadata + .get("packed") + .map(|v| v.to_lowercase() == "true") + .unwrap_or(false) + } } impl fmt::Display for Field { diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index 1c82a03e4bc..a4105b6d8ce 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -343,6 +343,53 @@ impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder { } } +#[derive(Debug)] +struct StructDataBlockBuilder { + children: Vec>, +} + +impl StructDataBlockBuilder { + // Currently only Struct with fixed-width fields are supported. + // And the assumption that all fields have `bits_per_value % 8 == 0` is made here. + fn new(bits_per_values: Vec, estimated_size_bytes: u64) -> Self { + let mut children = vec![]; + + debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0)); + + let bytes_per_row: u32 = bits_per_values.iter().sum::() / 8; + let bytes_per_row = bytes_per_row as u64; + + for bits_per_value in bits_per_values.iter() { + let this_estimated_size_bytes = + estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8; + let child = + FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes); + children.push(Box::new(child) as Box); + } + Self { children } + } +} + +impl DataBlockBuilderImpl for StructDataBlockBuilder { + fn append(&mut self, data_block: &DataBlock, selection: Range) { + let data_block = data_block.as_struct_ref().unwrap(); + for i in 0..self.children.len() { + self.children[i].append(&data_block.children[i], selection.clone()); + } + } + + fn finish(self: Box) -> DataBlock { + let mut children_data_block = Vec::new(); + for child in self.children { + let child_data_block = child.finish(); + children_data_block.push(child_data_block); + } + DataBlock::Struct(StructDataBlock { + children: children_data_block, + block_info: BlockInfo::new(), + }) + } +} /// A data block to represent a fixed size list #[derive(Debug)] pub struct FixedSizeListBlock { @@ -586,6 +633,7 @@ impl VariableWidthBlock { pub struct StructDataBlock { /// The child arrays pub children: Vec, + pub block_info: BlockInfo, } impl StructDataBlock { @@ -619,6 +667,7 @@ impl StructDataBlock { .into_iter() .map(|c| c.remove_validity()) .collect(), + block_info: self.block_info, } } @@ -636,6 +685,7 @@ impl StructDataBlock { .iter_mut() .map(|c| c.borrow_and_clone()) .collect(), + block_info: self.block_info.clone(), } } @@ -646,8 +696,16 @@ impl StructDataBlock { .iter() .map(|c| c.try_clone()) .collect::>()?, + block_info: self.block_info.clone(), }) } + + pub fn data_size(&self) -> u64 { + self.children + .iter() + .map(|data_block| data_block.data_size()) + .sum() + } } /// A data block for dictionary encoded data @@ -900,6 +958,18 @@ impl DataBlock { inner.dimension, )) } + Self::Struct(struct_data_block) => { + let mut bits_per_values = vec![]; + for child in struct_data_block.children.iter() { + let child = child.as_fixed_width_ref(). + expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported."); + bits_per_values.push(child.bits_per_value as u32); + } + Box::new(StructDataBlockBuilder::new( + bits_per_values, + estimated_size_bytes, + )) + } _ => todo!(), } } @@ -1359,7 +1429,10 @@ impl DataBlock { .collect::>(); children.push(Self::from_arrays(&child_vec, num_values)); } - Self::Struct(StructDataBlock { children }) + Self::Struct(StructDataBlock { + children, + block_info: BlockInfo::default(), + }) } DataType::FixedSizeList(_, dim) => { let children = arrays diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index dcfeeb2ecc1..d8f8a45e0a6 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -252,6 +252,7 @@ use crate::encodings::physical::binary::{BinaryBlockDecompressor, BinaryMiniBloc use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor; use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor; use crate::encodings::physical::fsst::FsstMiniBlockDecompressor; +use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor; use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor}; use crate::encodings::physical::{ColumnBuffers, FileBuffers}; use crate::format::pb::{self, column_encoding}; @@ -512,6 +513,11 @@ impl DecompressorStrategy for CoreDecompressorStrategy { pb::array_encoding::ArrayEncoding::FsstMiniBlock(description) => { Ok(Box::new(FsstMiniBlockDecompressor::new(description))) } + pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => { + Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new( + description, + ))) + } _ => todo!(), } } @@ -752,11 +758,26 @@ impl CoreFieldDecoderStrategy { column_info.as_ref(), self.decompressor_strategy.as_ref(), )?); + + // advance to the next top level column column_infos.next_top_level(); + return Ok(scheduler); } match &data_type { DataType::Struct(fields) => { + if field.is_packed_struct() { + let column_info = column_infos.expect_next()?; + let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new( + column_info.as_ref(), + self.decompressor_strategy.as_ref(), + )?); + + // advance to the next top level column + column_infos.next_top_level(); + + return Ok(scheduler); + } let mut child_schedulers = Vec::with_capacity(field.children.len()); for field in field.children.iter() { let field_scheduler = diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 960bcb290a9..c329bd55e1c 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -34,6 +34,7 @@ use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder; use crate::encodings::physical::fixed_size_list::FslPerValueCompressor; use crate::encodings::physical::fsst::{FsstArrayEncoder, FsstMiniBlockEncoder}; use crate::encodings::physical::packed_struct::PackedStructEncoder; +use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder; use crate::format::ProtobufUtils; use crate::repdef::RepDefBuilder; use crate::statistics::{GetStat, Stat}; @@ -832,6 +833,18 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { return Ok(Box::new(BinaryMiniBlockEncoder::default())); } } + if let DataBlock::Struct(ref struct_data_block) = data { + // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`, + // just being cautious here. + if struct_data_block + .children + .iter() + .any(|child| !matches!(child, DataBlock::FixedWidth(_))) + { + panic!("packed struct encoding currently only supports fixed-width fields.") + } + return Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default())); + } Ok(Box::new(ValueEncoder::default())) } @@ -1225,12 +1238,7 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy { Ok(Box::new(ListStructuralEncoder::new(child_encoder))) } DataType::Struct(_) => { - let field_metadata = &field.metadata; - if field_metadata - .get("packed") - .map(|v| v == "true") - .unwrap_or(false) - { + if field.is_packed_struct() { Ok(Box::new(PrimitiveStructuralEncoder::try_new( options, self.compression_strategy.clone(), diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index ee7810646e4..d00e2d1e71c 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -2599,6 +2599,18 @@ impl PrimitiveStructuralEncoder { Self::encode_simple_all_null(column_idx, num_values, row_number) } else { let data_block = DataBlock::from_arrays(&arrays, num_values); + + // if the `data_block` is a `StructDataBlock`, then this is a struct with packed struct encoding. + if let DataBlock::Struct(ref struct_data_block) = data_block { + if struct_data_block + .children + .iter() + .any(|child| !matches!(child, DataBlock::FixedWidth(_))) + { + panic!("packed struct encoding currently only supports fixed-width fields.") + } + } + const DICTIONARY_ENCODING_THRESHOLD: u64 = 100; let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) { diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 6a320211977..92320fe4d31 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -15,6 +15,7 @@ use futures::{ FutureExt, StreamExt, TryStreamExt, }; use itertools::Itertools; +use lance_arrow::FieldExt; use log::trace; use snafu::{location, Location}; @@ -607,7 +608,15 @@ impl StructuralStructDecoder { should_validate: bool, ) -> Box { match field.data_type() { - DataType::Struct(fields) => Box::new(Self::new(fields.clone(), should_validate, false)), + DataType::Struct(fields) => { + if field.is_packed_struct() { + let decoder = + StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate); + Box::new(decoder) + } else { + Box::new(Self::new(fields.clone(), should_validate, false)) + } + } DataType::List(child_field) | DataType::LargeList(child_field) => { let child_decoder = Self::field_to_decoder(child_field, should_validate); Box::new(StructuralListDecoder::new( diff --git a/rust/lance-encoding/src/encodings/physical.rs b/rust/lance-encoding/src/encodings/physical.rs index a108b679e16..8f5f76c787e 100644 --- a/rust/lance-encoding/src/encodings/physical.rs +++ b/rust/lance-encoding/src/encodings/physical.rs @@ -29,6 +29,7 @@ pub mod fixed_size_binary; pub mod fixed_size_list; pub mod fsst; pub mod packed_struct; +pub mod struct_encoding; pub mod value; /// These contain the file buffers shared across the entire file @@ -287,6 +288,7 @@ pub fn decoder_from_array_encoding( pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => unreachable!(), pb::array_encoding::ArrayEncoding::FsstMiniBlock(_) => unreachable!(), pb::array_encoding::ArrayEncoding::BinaryBlock(_) => unreachable!(), + pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(_) => unreachable!(), } } diff --git a/rust/lance-encoding/src/encodings/physical/packed_struct.rs b/rust/lance-encoding/src/encodings/physical/packed_struct.rs index 4feca6d9c4c..a513b5316b5 100644 --- a/rust/lance-encoding/src/encodings/physical/packed_struct.rs +++ b/rust/lance-encoding/src/encodings/physical/packed_struct.rs @@ -151,7 +151,10 @@ impl PrimitivePageDecoder for PackedStructPageDecoder { let child_block = FixedSizeListBlock::from_flat(child_block, field.data_type()); children.push(child_block); } - Ok(DataBlock::Struct(StructDataBlock { children })) + Ok(DataBlock::Struct(StructDataBlock { + children, + block_info: BlockInfo::default(), + })) } } @@ -266,9 +269,13 @@ pub mod tests { testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases}, version::LanceFileVersion, }; + use rstest::rstest; + #[rstest] #[test_log::test(tokio::test)] - async fn test_random_packed_struct() { + async fn test_random_packed_struct( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { let data_type = DataType::Struct(Fields::from(vec![ Field::new("a", DataType::UInt64, false), Field::new("b", DataType::UInt32, false), @@ -278,11 +285,14 @@ pub mod tests { let field = Field::new("", data_type, false).with_metadata(metadata); - check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await; + check_round_trip_encoding_random(field, version).await; } + #[rstest] #[test_log::test(tokio::test)] - async fn test_specific_packed_struct() { + async fn test_specific_packed_struct( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { let array1 = Arc::new(UInt64Array::from(vec![1, 2, 3, 4])); let array2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8])); let array3 = Arc::new(UInt8Array::from(vec![9, 10, 11, 12])); @@ -325,7 +335,8 @@ pub mod tests { .with_range(0..2) .with_range(0..6) .with_range(1..4) - .with_indices(vec![1, 3, 7]); + .with_indices(vec![1, 3, 7]) + .with_file_version(version); let mut metadata = HashMap::new(); metadata.insert("packed".to_string(), "true".to_string()); @@ -338,8 +349,12 @@ pub mod tests { .await; } + // the current Lance V2.1 `packed-struct encoding` doesn't support `fixed size list`. + #[rstest] #[test_log::test(tokio::test)] - async fn test_fsl_packed_struct() { + async fn test_fsl_packed_struct( + #[values(LanceFileVersion::V2_0, /*LanceFileVersion::V2_1)*/)] version: LanceFileVersion, + ) { let int_array = Arc::new(Int32Array::from(vec![12, 13, 14, 15])); let list_data_type = @@ -367,7 +382,8 @@ pub mod tests { .with_range(1..3) .with_range(0..1) .with_range(2..4) - .with_indices(vec![0, 2, 3]); + .with_indices(vec![0, 2, 3]) + .with_file_version(version); let mut metadata = HashMap::new(); metadata.insert("packed".to_string(), "true".to_string()); diff --git a/rust/lance-encoding/src/encodings/physical/struct_encoding.rs b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs new file mode 100644 index 00000000000..493356e374f --- /dev/null +++ b/rust/lance-encoding/src/encodings/physical/struct_encoding.rs @@ -0,0 +1,170 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use arrow::datatypes::UInt64Type; + +use lance_core::{Error, Result}; +use snafu::{location, Location}; + +use crate::{ + buffer::LanceBuffer, + data::{BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock}, + decoder::MiniBlockDecompressor, + encoder::{MiniBlockCompressed, MiniBlockCompressor}, + format::{ + pb::{self}, + ProtobufUtils, + }, + statistics::{GetStat, Stat}, +}; + +use super::value::{ValueDecompressor, ValueEncoder}; + +// Transforms a `StructDataBlock` into a row major `FixedWidthDataBlock`. +// Only fields with fixed-width fields are supported for now, and the +// assumption that all fields has `bits_per_value % 8 == 0` is made. +fn struct_data_block_to_fixed_width_data_block( + struct_data_block: StructDataBlock, + bits_per_values: &[u32], +) -> DataBlock { + let data_size = struct_data_block.expect_single_stat::(Stat::DataSize); + let mut output = Vec::with_capacity(data_size as usize); + let num_values = struct_data_block.children[0].num_values(); + + for i in 0..num_values as usize { + for (j, child) in struct_data_block.children.iter().enumerate() { + let bytes_per_value = (bits_per_values[j] / 8) as usize; + let this_data = child + .as_fixed_width_ref() + .unwrap() + .data + .slice_with_length(bytes_per_value * i, bytes_per_value); + output.extend_from_slice(&this_data); + } + } + + DataBlock::FixedWidth(FixedWidthDataBlock { + bits_per_value: bits_per_values + .iter() + .map(|bits_per_value| *bits_per_value as u64) + .sum(), + data: LanceBuffer::Owned(output), + num_values, + block_info: BlockInfo::default(), + }) +} + +#[derive(Debug, Default)] +pub struct PackedStructFixedWidthMiniBlockEncoder {} + +impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder { + fn compress( + &self, + data: DataBlock, + ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> { + match data { + DataBlock::Struct(struct_data_block) => { + let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value as u32).collect::>(); + + // transform struct datablock to fixed-width data block. + let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values); + + // store and transformed fixed-width data block. + let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box; + let (value_miniblock_compressed, value_array_encoding) = + value_miniblock_compressor.compress(data_block)?; + + Ok(( + value_miniblock_compressed, + ProtobufUtils::packed_struct_fixed_width_mini_block(value_array_encoding, bits_per_values), + )) + } + _ => Err(Error::InvalidInput { + source: format!( + "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder", + data.name() + ) + .into(), + location: location!(), + }), + } + } +} + +#[derive(Debug)] +pub struct PackedStructFixedWidthMiniBlockDecompressor { + bits_per_values: Vec, + array_encoding: Box, +} + +impl PackedStructFixedWidthMiniBlockDecompressor { + pub fn new(description: &pb::PackedStructFixedWidthMiniBlock) -> Self { + let array_encoding: Box = match description + .flat + .as_ref() + .unwrap() + .array_encoding + .as_ref() + .unwrap() + { + pb::array_encoding::ArrayEncoding::Flat(flat) => Box::new(ValueDecompressor::new(flat)), + _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."), + }; + Self { + bits_per_values: description.bits_per_values.clone(), + array_encoding, + } + } +} + +impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor { + fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result { + let encoded_data_block = self.array_encoding.decompress(data, num_values)?; + let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else { + panic!("ValueDecompressor should output FixedWidth DataBlock") + }; + + let bytes_per_values = self + .bits_per_values + .iter() + .map(|bits_per_value| *bits_per_value as usize / 8) + .collect::>(); + + assert!(encoded_data_block.bits_per_value % 8 == 0); + let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize; + + // use a prefix_sum vector as a helper to reconstruct to `StructDataBlock`. + let mut prefix_sum = vec![0; self.bits_per_values.len()]; + for i in 0..(self.bits_per_values.len() - 1) { + prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i]; + } + + let mut children_data_block = vec![]; + for i in 0..self.bits_per_values.len() { + let child_buf_size = bytes_per_values[i] * num_values as usize; + let mut child_buf: Vec = Vec::with_capacity(child_buf_size); + + for j in 0..num_values as usize { + // the start of the data at this row is `j * encoded_bytes_per_row`, and the offset for this field is `prefix_sum[i]`, this field has length `bytes_per_values[i]`. + let this_value = encoded_data_block.data.slice_with_length( + prefix_sum[i] + (j * encoded_bytes_per_row), + bytes_per_values[i], + ); + + child_buf.extend_from_slice(&this_value); + } + + let child = DataBlock::FixedWidth(FixedWidthDataBlock { + data: LanceBuffer::Owned(child_buf), + bits_per_value: self.bits_per_values[i] as u64, + num_values, + block_info: BlockInfo::default(), + }); + children_data_block.push(child); + } + Ok(DataBlock::Struct(StructDataBlock { + children: children_data_block, + block_info: BlockInfo::default(), + })) + } +} diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index c01ac9ee457..608481e3e23 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -21,7 +21,8 @@ use pb::{ page_layout::Layout, AllNullLayout, ArrayEncoding, Binary, BinaryBlock, BinaryMiniBlock, Bitpack2, Bitpacked, BitpackedForNonNeg, Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, FsstMiniBlock, - MiniBlockLayout, Nullable, PackedStruct, PageLayout, RepDefLayer, + MiniBlockLayout, Nullable, PackedStruct, PackedStructFixedWidthMiniBlock, PageLayout, + RepDefLayer, }; use crate::{ @@ -174,6 +175,20 @@ impl ProtobufUtils { } } + pub fn packed_struct_fixed_width_mini_block( + data: ArrayEncoding, + bits_per_values: Vec, + ) -> ArrayEncoding { + ArrayEncoding { + array_encoding: Some(ArrayEncodingEnum::PackedStructFixedWidthMiniBlock( + Box::new(PackedStructFixedWidthMiniBlock { + flat: Some(Box::new(data)), + bits_per_values, + }), + )), + } + } + pub fn binary( indices_encoding: ArrayEncoding, bytes_encoding: ArrayEncoding, diff --git a/rust/lance-encoding/src/statistics.rs b/rust/lance-encoding/src/statistics.rs index cbd63177c07..9596ab2099e 100644 --- a/rust/lance-encoding/src/statistics.rs +++ b/rust/lance-encoding/src/statistics.rs @@ -7,7 +7,7 @@ use std::{ sync::Arc, }; -use arrow::array::AsArray; +use arrow::{array::AsArray, datatypes::UInt64Type}; use arrow_array::{Array, ArrowPrimitiveType, UInt64Array}; use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; use num_traits::PrimInt; @@ -61,7 +61,7 @@ impl ComputeStat for DataBlock { Self::FixedSizeList(_) => {} Self::VariableWidth(data_block) => data_block.compute_stat(), Self::Opaque(data_block) => data_block.compute_stat(), - Self::Struct(_) => {} + Self::Struct(data_block) => data_block.compute_stat(), Self::Dictionary(_) => {} } } @@ -371,8 +371,30 @@ impl GetStat for DictionaryDataBlock { } impl GetStat for StructDataBlock { - fn get_stat(&self, _stat: Stat) -> Option> { - None + fn get_stat(&self, stat: Stat) -> Option> { + let block_info = self.block_info.0.read().unwrap(); + if block_info.is_empty() { + panic!("get_stat should be called after statistics are computed.") + } + block_info.get(&stat).cloned() + } +} + +impl ComputeStat for StructDataBlock { + fn compute_stat(&mut self) { + let data_size = self.data_size(); + let data_size_array = Arc::new(UInt64Array::from(vec![data_size])); + + let max_len = self + .children + .iter() + .map(|child| child.expect_single_stat::(Stat::MaxLength)) + .sum::(); + let max_len_array = Arc::new(UInt64Array::from(vec![max_len])); + + let mut info = self.block_info.0.write().unwrap(); + info.insert(Stat::DataSize, data_size_array); + info.insert(Stat::MaxLength, max_len_array); } } @@ -394,6 +416,7 @@ mod tests { use super::DataBlock; use arrow::{ + array::AsArray, compute::concat, datatypes::{Int32Type, UInt64Type}, }; @@ -442,18 +465,25 @@ mod tests { let fields = vec![ Arc::new(Field::new("int_field", DataType::Int32, false)), Arc::new(Field::new("float_field", DataType::Float32, false)), - Arc::new(Field::new( - "fsl_field", - DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 5), - false, - )), ] .into(); let mut gen = lance_datagen::array::rand_type(&DataType::Struct(fields)); let arr = gen.generate(RowCount::from(3), &mut rng).unwrap(); let block = DataBlock::from_array(arr.clone()); - assert!(block.get_stat(Stat::DataSize).is_none()); + let (_, arr_parts, _) = arr.as_struct().clone().into_parts(); + let total_buffer_size: usize = arr_parts + .iter() + .map(|arr| { + arr.to_data() + .buffers() + .iter() + .map(|buffer| buffer.len()) + .sum::() + }) + .sum(); + let data_size = block.expect_single_stat::(Stat::DataSize); + assert!(data_size == total_buffer_size as u64); // test DataType::Dictionary let mut gen = array::rand_type(&DataType::Dictionary( diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 172bc6f41cc..77888bf2761 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -218,23 +218,29 @@ impl ReaderProjection { /// /// If the schema provided is not the schema of the entire file then /// the projection will be invalid and the read will fail. + /// If the field is a `struct datatype` with `packed` set to true in the field metadata, + /// the whole struct has one column index. + /// To support nested `packed-struct encoding`, this method need to be further adjusted. pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self { let schema = Arc::new(schema.clone()); let is_structural = version >= LanceFileVersion::V2_1; - let mut counter = 0; - let counter = &mut counter; - let column_indices = schema - .fields_pre_order() - .filter_map(|field| { - if field.children.is_empty() || !is_structural { - let col_idx = *counter; - *counter += 1; - Some(col_idx) - } else { - None - } - }) - .collect::>(); + let mut column_indices = vec![]; + let mut curr_column_idx = 0; + let mut packed_struct_fields_num = 0; + for field in schema.fields_pre_order() { + if packed_struct_fields_num > 0 { + packed_struct_fields_num -= 1; + continue; + } + if field.is_packed_struct() { + column_indices.push(curr_column_idx); + curr_column_idx += 1; + packed_struct_fields_num = field.children.len(); + } else if field.children.is_empty() || !is_structural { + column_indices.push(curr_column_idx); + curr_column_idx += 1; + } + } Self { schema, column_indices,