Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions protos/encodings_v2_1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ message ConstantLayout {
// - MUST be absent for an all-null page
// - MUST be <= 32 bytes if present
optional bytes inline_value = 6;

// Optional compression algorithm used for the repetition buffer.
// If absent, repetition levels are stored as raw u16 values.
CompressiveEncoding rep_compression = 7;
// Optional compression algorithm used for the definition buffer.
// If absent, definition levels are stored as raw u16 values.
CompressiveEncoding def_compression = 8;
// Number of values in repetition buffer after decompression.
uint64 num_rep_values = 9;
// Number of values in definition buffer after decompression.
uint64 num_def_values = 10;
}

// A layout where large binary data is encoded externally and only
Expand Down
284 changes: 268 additions & 16 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,12 +779,20 @@ pub struct ComplexAllNullScheduler {
def_meaning: Arc<[DefinitionInterpretation]>,
repdef: Option<Arc<CachedComplexAllNullState>>,
max_visible_level: u16,
rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
def_decompressor: Option<Arc<dyn BlockDecompressor>>,
num_rep_values: u64,
num_def_values: u64,
}

impl ComplexAllNullScheduler {
pub fn new(
buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
def_meaning: Arc<[DefinitionInterpretation]>,
rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
def_decompressor: Option<Arc<dyn BlockDecompressor>>,
num_rep_values: u64,
num_def_values: u64,
) -> Self {
let max_visible_level = def_meaning
.iter()
Expand All @@ -796,6 +804,10 @@ impl ComplexAllNullScheduler {
def_meaning,
repdef: None,
max_visible_level,
rep_decompressor,
def_decompressor,
num_rep_values,
num_def_values,
}
}
}
Expand All @@ -820,25 +832,89 @@ impl StructuralPageScheduler for ComplexAllNullScheduler {
}

let data = io.submit_request(reads, 0);
let rep_decompressor = self.rep_decompressor.clone();
let def_decompressor = self.def_decompressor.clone();
let num_rep_values = self.num_rep_values;
let num_def_values = self.num_def_values;

async move {
let data = data.await?;
let mut data_iter = data.into_iter();

let decompress_levels = |compressed_bytes: Bytes,
decompressor: &Arc<dyn BlockDecompressor>,
num_values: u64,
level_type: &str|
-> Result<ScalarBuffer<u16>> {
let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1);
let decompressed = decompressor.decompress(compressed_buffer, num_values)?;
match decompressed {
DataBlock::FixedWidth(block) => {
if block.num_values != num_values {
return Err(Error::InvalidInput {
source: format!(
"Unexpected {} level count after decompression: expected {}, got {}",
level_type, num_values, block.num_values
)
.into(),
location: location!(),
});
}
if block.bits_per_value != 16 {
return Err(Error::InvalidInput {
source: format!(
"Unexpected {} level bit width after decompression: expected 16, got {}",
level_type, block.bits_per_value
)
.into(),
location: location!(),
});
}
Ok(block.data.borrow_to_typed_slice::<u16>())
}
_ => Err(Error::InvalidInput {
source: format!(
"Expected fixed-width data block for {} levels",
level_type
)
.into(),
location: location!(),
}),
}
};

let rep = if has_rep {
let rep = data_iter.next().unwrap();
let rep = LanceBuffer::from_bytes(rep, 2);
let rep = rep.borrow_to_typed_slice::<u16>();
Some(rep)
if let Some(rep_decompressor) = rep_decompressor.as_ref() {
Some(decompress_levels(
rep,
rep_decompressor,
num_rep_values,
"repetition",
)?)
} else {
let rep = LanceBuffer::from_bytes(rep, 2);
let rep = rep.borrow_to_typed_slice::<u16>();
Some(rep)
}
} else {
None
};

let def = if has_def {
let def = data_iter.next().unwrap();
let def = LanceBuffer::from_bytes(def, 2);
let def = def.borrow_to_typed_slice::<u16>();
Some(def)
if let Some(def_decompressor) = def_decompressor.as_ref() {
Some(decompress_levels(
def,
def_decompressor,
num_def_values,
"definition",
)?)
} else {
let def = LanceBuffer::from_bytes(def, 2);
let def = def.borrow_to_typed_slice::<u16>();
Some(def)
}
} else {
None
};
Expand Down Expand Up @@ -3093,9 +3169,27 @@ impl StructuralPrimitiveFieldScheduler {
{
Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
} else {
let rep_decompressor = constant_layout
.rep_compression
.as_ref()
.map(|encoding| decompressors.create_block_decompressor(encoding))
.transpose()?
.map(Arc::from);

let def_decompressor = constant_layout
.def_compression
.as_ref()
.map(|encoding| decompressors.create_block_decompressor(encoding))
.transpose()?
.map(Arc::from);

Box::new(ComplexAllNullScheduler::new(
page_info.buffer_offsets_and_sizes.clone(),
def_meaning.into(),
rep_decompressor,
def_decompressor,
constant_layout.num_rep_values,
constant_layout.num_def_values,
)) as Box<dyn StructuralPageScheduler>
}
}
Expand Down Expand Up @@ -3833,6 +3927,27 @@ impl PrimitiveStructuralEncoder {
})
}

fn encode_complex_all_null_vals(
data: &Arc<[u16]>,
compression_strategy: &dyn CompressionStrategy,
) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> {
let buffer = LanceBuffer::reinterpret_slice(data.clone());
let mut fixed_width_block = FixedWidthDataBlock {
data: buffer,
bits_per_value: 16,
num_values: data.len() as u64,
block_info: BlockInfo::new(),
};
fixed_width_block.compute_stat();

let levels_block = DataBlock::FixedWidth(fixed_width_block);
let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
let (compressor, encoding) =
compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
let compressed_buffer = compressor.compress(levels_block)?;
Ok((compressed_buffer, encoding))
}

// Encodes a page where all values are null but we have rep/def
// information that we need to store (e.g. to distinguish between
// different kinds of null)
Expand All @@ -3841,21 +3956,59 @@ impl PrimitiveStructuralEncoder {
repdef: crate::repdef::SerializedRepDefs,
row_number: u64,
num_rows: u64,
version: LanceFileVersion,
compression_strategy: &dyn CompressionStrategy,
) -> Result<EncodedPage> {
// TODO: Actually compress repdef
let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
LanceBuffer::reinterpret_slice(rep.clone())
if version.resolve() < LanceFileVersion::V2_2 {
let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
LanceBuffer::reinterpret_slice(rep.clone())
} else {
LanceBuffer::empty()
};

let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
LanceBuffer::reinterpret_slice(def.clone())
} else {
LanceBuffer::empty()
};

let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
return Ok(EncodedPage {
column_idx,
data: vec![rep_bytes, def_bytes],
description: PageEncoding::Structural(description),
num_rows,
row_number,
});
}

let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) =
repdef.repetition_levels.as_ref()
{
let num_values = rep.len() as u64;
let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?;
(buffer, Some(encoding), num_values)
} else {
LanceBuffer::empty()
(LanceBuffer::empty(), None, 0)
};

let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
LanceBuffer::reinterpret_slice(def.clone())
let (def_bytes, def_encoding, num_def_values) = if let Some(def) =
repdef.definition_levels.as_ref()
{
let num_values = def.len() as u64;
let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?;
(buffer, Some(encoding), num_values)
} else {
LanceBuffer::empty()
(LanceBuffer::empty(), None, 0)
};

let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
let description = ProtobufUtils21::compressed_all_null_constant_layout(
&repdef.def_meaning,
rep_encoding,
def_encoding,
num_rep_values,
num_def_values,
);
Ok(EncodedPage {
column_idx,
data: vec![rep_bytes, def_bytes],
Expand Down Expand Up @@ -4631,7 +4784,14 @@ impl PrimitiveStructuralEncoder {
// either have all empty lists or all null lists (or a mix). We still need to encode
// the rep/def information but we can skip the data encoding.
log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
return Self::encode_complex_all_null(column_idx, repdef, row_number, num_rows);
return Self::encode_complex_all_null(
column_idx,
repdef,
row_number,
num_rows,
version,
compression_strategy.as_ref(),
);
}

let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?;
Expand All @@ -4656,7 +4816,14 @@ impl PrimitiveStructuralEncoder {
num_values,
num_rows
);
Self::encode_complex_all_null(column_idx, repdef, row_number, num_rows)
Self::encode_complex_all_null(
column_idx,
repdef,
row_number,
num_rows,
version,
compression_strategy.as_ref(),
)
};
}

Expand Down Expand Up @@ -6511,4 +6678,89 @@ mod tests {
.with_page_sizes(vec![4096]);
check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
}

#[test]
fn test_encode_decode_complex_all_null_vals_roundtrip() {
use crate::compression::{
DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy,
};

let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::<Vec<u16>>());

let compression_strategy = DefaultCompressionStrategy::default();
let decompression_strategy = DefaultDecompressionStrategy::default();

let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals(
&values,
&compression_strategy,
)
.unwrap();

let decompressor = decompression_strategy
.create_block_decompressor(&encoding)
.unwrap();
let decompressed = decompressor
.decompress(compressed_buf, values.len() as u64)
.unwrap();
let decompressed_fixed_width = decompressed.as_fixed_width().unwrap();
assert_eq!(decompressed_fixed_width.num_values, values.len() as u64);
assert_eq!(decompressed_fixed_width.bits_per_value, 16);
let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::<u16>();
assert_eq!(rep_result.as_ref(), values.as_ref());
}

#[tokio::test]
async fn test_complex_all_null_compression_gated_by_version() {
use crate::format::pb21::page_layout::Layout;
use arrow_array::ListArray;

let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
(0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
);
let arr: ArrayRef = Arc::new(list_array);
let field = arrow_schema::Field::new(
"c",
DataType::List(Arc::new(arrow_schema::Field::new(
"item",
DataType::Int32,
true,
))),
true,
);

let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await;
let PageEncoding::Structural(layout_v21) = &page_v21.description else {
panic!("Expected structural encoding");
};
let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else {
panic!("Expected constant layout");
};
assert!(layout_v21.rep_compression.is_none());
assert!(layout_v21.def_compression.is_none());
assert_eq!(layout_v21.num_rep_values, 0);
assert_eq!(layout_v21.num_def_values, 0);

let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await;
let PageEncoding::Structural(layout_v22) = &page_v22.description else {
panic!("Expected structural encoding");
};
let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else {
panic!("Expected constant layout");
};
assert!(layout_v22.def_compression.is_some());
assert!(layout_v22.num_def_values > 0);
}

#[tokio::test]
async fn test_complex_all_null_round_trip() {
use arrow_array::ListArray;

let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
(0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
);

let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2);
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
.await;
}
}
Loading