Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
72d6920
packed-struct encoding
broccoliSpicy Dec 4, 2024
409a465
correct `estimated_bytes` computation
broccoliSpicy Dec 4, 2024
cd8343e
a workable version for schema with more than one columns
broccoliSpicy Dec 5, 2024
90f82e7
remove fixed-size-list field in benchmark script
broccoliSpicy Dec 5, 2024
4297877
Merge branch 'main' into packed-struct-encoding
broccoliSpicy Dec 5, 2024
5bfc8c8
remove the added statistics gathering for FixedSizeListBlock
broccoliSpicy Dec 5, 2024
292f0d7
Merge remote-tracking branch 'origin/packed-struct-encoding'
broccoliSpicy Dec 5, 2024
ba4c6d6
fix merge error
broccoliSpicy Dec 5, 2024
ce89ff1
fmt
broccoliSpicy Dec 5, 2024
9a5a13f
fmt
broccoliSpicy Dec 5, 2024
c38ed70
fix bug with `create_structural_field_scheduler`
broccoliSpicy Dec 6, 2024
c28b3cf
correct `estimate size` in StructDataBlockBuilder.
broccoliSpicy Dec 9, 2024
40e29c9
add check in `PrimitiveStructuralEncoder::do_flush` to make sure fields
broccoliSpicy Dec 9, 2024
f54c873
use random float number in test_packed_struct.py
broccoliSpicy Dec 9, 2024
5f632f5
fmt
broccoliSpicy Dec 9, 2024
ac74812
Merge branch 'main' into packed-struct-encoding
broccoliSpicy Dec 9, 2024
bf73273
address PR comments
broccoliSpicy Dec 10, 2024
e2b6c9a
Merge remote-tracking branch 'origin/packed-struct-encoding'
broccoliSpicy Dec 10, 2024
b15eec9
Merge branch 'main' into packed-struct-encoding
broccoliSpicy Dec 10, 2024
d4b5a00
address PR comments
broccoliSpicy Dec 10, 2024
7bab88c
Merge remote-tracking branch 'origin/packed-struct-encoding'
broccoliSpicy Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ message PackedStruct {
Buffer buffer = 2;
}

message PackedStructFixedWidthMiniBlock {
ArrayEncoding Flat = 1;
repeated uint32 bits_per_values = 2;
}

message FixedSizeBinary {
ArrayEncoding bytes = 1;
uint32 byte_width = 2;
Expand All @@ -283,6 +288,7 @@ message ArrayEncoding {
BinaryMiniBlock binary_mini_block = 15;
FsstMiniBlock fsst_mini_block = 16;
BinaryBlock binary_block = 17;
PackedStructFixedWidthMiniBlock packed_struct_fixed_width_mini_block = 18;
}
}

Expand Down
31 changes: 19 additions & 12 deletions python/python/benchmarks/test_packed_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

NUM_ROWS = 10_000_000
RANDOM_ACCESS = "indices"
NUM_INDICES = 100
NUM_INDICES = 1000
NUM_ROUNDS = 10
BATCH_SIZE = 16 * 1024

# This file compares benchmarks for reading and writing a StructArray column using
# (i) parquet
Expand All @@ -31,15 +32,12 @@ def test_data(tmp_path_factory):
{
"struct_col": pa.StructArray.from_arrays(
[
pc.random(NUM_ROWS).cast(pa.float32()),
pa.array(range(NUM_ROWS), type=pa.int32()),
pa.FixedSizeListArray.from_arrays(
pc.random(NUM_ROWS * 5).cast(pa.float32()), 5
),
pa.array(range(NUM_ROWS), type=pa.int32()),
pa.array(range(NUM_ROWS), type=pa.int32()),
pc.random(NUM_ROWS).cast(pa.float32()), # f1
pc.random(NUM_ROWS).cast(pa.float32()), # f2
pc.random(NUM_ROWS).cast(pa.float32()), # f3
pc.random(NUM_ROWS).cast(pa.float32()), # f4
],
["f", "i", "fsl", "i2", "i3"],
["f1", "f2", "f3", "f4"],
)
}
)
Expand All @@ -51,6 +49,7 @@ def test_data(tmp_path_factory):
@pytest.fixture(scope="module")
def random_indices():
random_indices = [random.randint(0, NUM_ROWS) for _ in range(NUM_INDICES)]
random_indices.sort()
return random_indices


Expand All @@ -59,12 +58,18 @@ def test_parquet_read(tmp_path: Path, benchmark, test_data, random_indices):
parquet_path = tmp_path / "data.parquet"
pq.write_table(test_data, parquet_path)

def read_parquet():
parquet_file = pq.ParquetFile(parquet_path)
batches = parquet_file.iter_batches(batch_size=BATCH_SIZE)
tab_parquet = pa.Table.from_batches(batches)
return tab_parquet

if RANDOM_ACCESS == "indices":
benchmark.pedantic(
lambda: pq.read_table(parquet_path).take(random_indices), rounds=5
)
elif RANDOM_ACCESS == "full":
benchmark.pedantic(lambda: pq.read_table(parquet_path), rounds=5)
benchmark.pedantic(lambda: read_parquet(), rounds=5)


def read_lance_file_random(lance_path, random_indices):
Expand All @@ -75,7 +80,9 @@ def read_lance_file_random(lance_path, random_indices):


def read_lance_file_full(lance_path):
for batch in LanceFileReader(lance_path).read_all(batch_size=1000).to_batches():
for batch in (
LanceFileReader(lance_path).read_all(batch_size=BATCH_SIZE).to_batches()
):
pass


Expand Down Expand Up @@ -127,7 +134,7 @@ def test_parquet_write(tmp_path: Path, benchmark, test_data):


def write_lance_file(lance_path, test_data):
with LanceFileWriter(lance_path, test_data.schema) as writer:
with LanceFileWriter(lance_path, test_data.schema, version="2.1") as writer:
for batch in test_data.to_batches():
writer.write_batch(batch)

Expand Down
11 changes: 11 additions & 0 deletions rust/lance-arrow/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub trait FieldExt {
///
/// This is intended for display purposes and not for serialization
fn to_compact_string(&self, indent: Indentation) -> String;

fn is_packed_struct(&self) -> bool;
}

impl FieldExt for Field {
Expand Down Expand Up @@ -79,6 +81,15 @@ impl FieldExt for Field {
}
result
}

// Check if field has metadata `packed` set to true, this check is case insensitive.
fn is_packed_struct(&self) -> bool {
let field_metadata = self.metadata();
field_metadata
.get("packed")
.map(|v| v.to_lowercase() == "true")
.unwrap_or(false)
}
}

/// Extends the functionality of [arrow_schema::Schema].
Expand Down
9 changes: 9 additions & 0 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,15 @@ impl Field {
}
None
}

// Check if field has metadata `packed` set to true, this check is case insensitive.
pub fn is_packed_struct(&self) -> bool {
let field_metadata = &self.metadata;
field_metadata
.get("packed")
.map(|v| v.to_lowercase() == "true")
.unwrap_or(false)
}
}

impl fmt::Display for Field {
Expand Down
75 changes: 74 additions & 1 deletion rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,53 @@ impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
}
}

#[derive(Debug)]
struct StructDataBlockBuilder {
children: Vec<Box<dyn DataBlockBuilderImpl>>,
}

impl StructDataBlockBuilder {
// Currently only Struct with fixed-width fields are supported.
// And the assumption that all fields have `bits_per_value % 8 == 0` is made here.
fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
let mut children = vec![];

debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));

let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
let bytes_per_row = bytes_per_row as u64;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a debug assert to verify our assumption that bits_per_values.iter().all(|bpv| bpv % 8 == 0)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks.


for bits_per_value in bits_per_values.iter() {
let this_estimated_size_bytes =
estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
let child =
FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
}
Self { children }
}
}

impl DataBlockBuilderImpl for StructDataBlockBuilder {
fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
let data_block = data_block.as_struct_ref().unwrap();
for i in 0..self.children.len() {
self.children[i].append(&data_block.children[i], selection.clone());
}
}

fn finish(self: Box<Self>) -> DataBlock {
let mut children_data_block = Vec::new();
for child in self.children {
let child_data_block = child.finish();
children_data_block.push(child_data_block);
}
DataBlock::Struct(StructDataBlock {
children: children_data_block,
block_info: BlockInfo::new(),
})
}
}
/// A data block to represent a fixed size list
#[derive(Debug)]
pub struct FixedSizeListBlock {
Expand Down Expand Up @@ -586,6 +633,7 @@ impl VariableWidthBlock {
pub struct StructDataBlock {
/// The child arrays
pub children: Vec<DataBlock>,
pub block_info: BlockInfo,
}

impl StructDataBlock {
Expand Down Expand Up @@ -619,6 +667,7 @@ impl StructDataBlock {
.into_iter()
.map(|c| c.remove_validity())
.collect(),
block_info: self.block_info,
}
}

Expand All @@ -636,6 +685,7 @@ impl StructDataBlock {
.iter_mut()
.map(|c| c.borrow_and_clone())
.collect(),
block_info: self.block_info.clone(),
}
}

Expand All @@ -646,8 +696,16 @@ impl StructDataBlock {
.iter()
.map(|c| c.try_clone())
.collect::<Result<_>>()?,
block_info: self.block_info.clone(),
})
}

pub fn data_size(&self) -> u64 {
self.children
.iter()
.map(|data_block| data_block.data_size())
.sum()
}
}

/// A data block for dictionary encoded data
Expand Down Expand Up @@ -900,6 +958,18 @@ impl DataBlock {
inner.dimension,
))
}
Self::Struct(struct_data_block) => {
let mut bits_per_values = vec![];
for child in struct_data_block.children.iter() {
let child = child.as_fixed_width_ref().
expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
bits_per_values.push(child.bits_per_value as u32);
}
Box::new(StructDataBlockBuilder::new(
bits_per_values,
estimated_size_bytes,
))
}
_ => todo!(),
}
}
Expand Down Expand Up @@ -1359,7 +1429,10 @@ impl DataBlock {
.collect::<Vec<_>>();
children.push(Self::from_arrays(&child_vec, num_values));
}
Self::Struct(StructDataBlock { children })
Self::Struct(StructDataBlock {
children,
block_info: BlockInfo::default(),
})
}
DataType::FixedSizeList(_, dim) => {
let children = arrays
Expand Down
21 changes: 21 additions & 0 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ use crate::encodings::physical::binary::{BinaryBlockDecompressor, BinaryMiniBloc
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor;
use crate::encodings::physical::fsst::FsstMiniBlockDecompressor;
use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb::{self, column_encoding};
Expand Down Expand Up @@ -512,6 +513,11 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
pb::array_encoding::ArrayEncoding::FsstMiniBlock(description) => {
Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
}
pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
description,
)))
}
_ => todo!(),
}
}
Expand Down Expand Up @@ -752,11 +758,26 @@ impl CoreFieldDecoderStrategy {
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
)?);

// advance to the next top level column
column_infos.next_top_level();

return Ok(scheduler);
}
match &data_type {
DataType::Struct(fields) => {
if field.is_packed_struct() {
let column_info = column_infos.expect_next()?;
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
)?);

// advance to the next top level column
column_infos.next_top_level();

return Ok(scheduler);
}
let mut child_schedulers = Vec::with_capacity(field.children.len());
for field in field.children.iter() {
let field_scheduler =
Expand Down
20 changes: 14 additions & 6 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fixed_size_list::FslPerValueCompressor;
use crate::encodings::physical::fsst::{FsstArrayEncoder, FsstMiniBlockEncoder};
use crate::encodings::physical::packed_struct::PackedStructEncoder;
use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder;
use crate::format::ProtobufUtils;
use crate::repdef::RepDefBuilder;
use crate::statistics::{GetStat, Stat};
Expand Down Expand Up @@ -832,6 +833,18 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
return Ok(Box::new(BinaryMiniBlockEncoder::default()));
}
}
if let DataBlock::Struct(ref struct_data_block) = data {
// this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
// just being cautious here.
if struct_data_block
.children
.iter()
.any(|child| !matches!(child, DataBlock::FixedWidth(_)))
{
panic!("packed struct encoding currently only supports fixed-width fields.")
}
return Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()));
}
Ok(Box::new(ValueEncoder::default()))
}

Expand Down Expand Up @@ -1225,12 +1238,7 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
}
DataType::Struct(_) => {
let field_metadata = &field.metadata;
if field_metadata
.get("packed")
.map(|v| v == "true")
.unwrap_or(false)
{
if field.is_packed_struct() {
Ok(Box::new(PrimitiveStructuralEncoder::try_new(
options,
self.compression_strategy.clone(),
Expand Down
12 changes: 12 additions & 0 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2599,6 +2599,18 @@ impl PrimitiveStructuralEncoder {
Self::encode_simple_all_null(column_idx, num_values, row_number)
} else {
let data_block = DataBlock::from_arrays(&arrays, num_values);

// if the `data_block` is a `StructDataBlock`, then this is a struct with packed struct encoding.
if let DataBlock::Struct(ref struct_data_block) = data_block {
if struct_data_block
.children
.iter()
.any(|child| !matches!(child, DataBlock::FixedWidth(_)))
{
panic!("packed struct encoding currently only supports fixed-width fields.")
}
}

const DICTIONARY_ENCODING_THRESHOLD: u64 = 100;
let cardinality =
if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
Expand Down
11 changes: 10 additions & 1 deletion rust/lance-encoding/src/encodings/logical/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use futures::{
FutureExt, StreamExt, TryStreamExt,
};
use itertools::Itertools;
use lance_arrow::FieldExt;
use log::trace;
use snafu::{location, Location};

Expand Down Expand Up @@ -607,7 +608,15 @@ impl StructuralStructDecoder {
should_validate: bool,
) -> Box<dyn StructuralFieldDecoder> {
match field.data_type() {
DataType::Struct(fields) => Box::new(Self::new(fields.clone(), should_validate, false)),
DataType::Struct(fields) => {
if field.is_packed_struct() {
let decoder =
StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
Box::new(decoder)
} else {
Box::new(Self::new(fields.clone(), should_validate, false))
}
}
DataType::List(child_field) | DataType::LargeList(child_field) => {
let child_decoder = Self::field_to_decoder(child_field, should_validate);
Box::new(StructuralListDecoder::new(
Expand Down
Loading