Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 127 additions & 4 deletions rust/lance-encoding/src/encodings/physical/packed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,41 @@ enum FieldAccumulator {
Fixed {
builder: DataBlockBuilder,
bits_per_value: u64,
empty_value: DataBlock,
},
Variable32 {
builder: DataBlockBuilder,
empty_value: DataBlock,
},
Variable64 {
builder: DataBlockBuilder,
empty_value: DataBlock,
},
}

impl FieldAccumulator {
// In full-zip variable packed decoding, rep/def may produce a visible row
// with an empty payload (e.g. null/invalid item). We still need to append
// one placeholder per child so child row counts remain aligned.
fn append_empty(&mut self) {
match self {
Self::Fixed {
builder,
empty_value,
..
} => builder.append(empty_value, 0..1),
Self::Variable32 {
builder,
empty_value,
} => builder.append(empty_value, 0..1),
Self::Variable64 {
builder,
empty_value,
} => builder.append(empty_value, 0..1),
}
}
}

impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
let num_values = data.num_values;
Expand Down Expand Up @@ -500,19 +526,40 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
location!(),
)
})?;
let empty_value = DataBlock::FixedWidth(FixedWidthDataBlock {
data: LanceBuffer::from(vec![0_u8; bytes_per_value as usize]),
bits_per_value: *bits_per_value,
num_values: 1,
block_info: BlockInfo::new(),
});
accumulators.push(FieldAccumulator::Fixed {
builder: DataBlockBuilder::with_capacity_estimate(estimate),
bits_per_value: *bits_per_value,
empty_value,
});
}
VariablePackedStructFieldKind::Variable {
bits_per_length, ..
} => match bits_per_length {
32 => accumulators.push(FieldAccumulator::Variable32 {
builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
empty_value: DataBlock::VariableWidth(VariableWidthBlock {
data: LanceBuffer::empty(),
bits_per_offset: 32,
offsets: LanceBuffer::reinterpret_vec(vec![0_u32, 0_u32]),
num_values: 1,
block_info: BlockInfo::new(),
}),
}),
64 => accumulators.push(FieldAccumulator::Variable64 {
builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
empty_value: DataBlock::VariableWidth(VariableWidthBlock {
data: LanceBuffer::empty(),
bits_per_offset: 64,
offsets: LanceBuffer::reinterpret_vec(vec![0_u64, 0_u64]),
num_values: 1,
block_info: BlockInfo::new(),
}),
}),
_ => {
return Err(Error::invalid_input(
Expand All @@ -533,6 +580,12 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
location!(),
));
}
if row_start == row_end {
for accumulator in accumulators.iter_mut() {
accumulator.append_empty();
}
continue;
}
let mut cursor = row_start;
for (field, accumulator) in self.fields.iter().zip(accumulators.iter_mut()) {
match (&field.kind, accumulator) {
Expand All @@ -541,6 +594,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
FieldAccumulator::Fixed {
builder,
bits_per_value: acc_bits,
..
},
) => {
debug_assert_eq!(bits_per_value, acc_bits);
Expand All @@ -565,7 +619,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
VariablePackedStructFieldKind::Variable {
bits_per_length, ..
},
FieldAccumulator::Variable32 { builder },
FieldAccumulator::Variable32 { builder, .. },
) => {
if *bits_per_length != 32 {
return Err(Error::invalid_input(
Expand Down Expand Up @@ -607,7 +661,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
VariablePackedStructFieldKind::Variable {
bits_per_length, ..
},
FieldAccumulator::Variable64 { builder },
FieldAccumulator::Variable64 { builder, .. },
) => {
if *bits_per_length != 64 {
return Err(Error::invalid_input(
Expand Down Expand Up @@ -684,7 +738,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
decompressor,
},
},
FieldAccumulator::Variable32 { builder },
FieldAccumulator::Variable32 { builder, .. },
) => {
let DataBlock::VariableWidth(mut block) = builder.finish() else {
panic!("Expected variable-width datablock from builder");
Expand All @@ -702,7 +756,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
decompressor,
},
},
FieldAccumulator::Variable64 { builder },
FieldAccumulator::Variable64 { builder, .. },
) => {
let DataBlock::VariableWidth(mut block) = builder.finish() else {
panic!("Expected variable-width datablock from builder");
Expand Down Expand Up @@ -1116,4 +1170,73 @@ mod tests {

assert!(matches!(result, Err(Error::NotSupported { .. })));
}

#[test]
fn variable_packed_struct_decompress_empty_row() -> Result<()> {
let strategy = DefaultDecompressionStrategy::default();
let fixed_decompressor = Arc::from(
crate::compression::DecompressionStrategy::create_fixed_per_value_decompressor(
&strategy,
&ProtobufUtils21::flat(32, None),
)?,
);
let variable_decompressor = Arc::from(
crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
&strategy,
&ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
)?,
);

let decompressor = PackedStructVariablePerValueDecompressor::new(vec![
VariablePackedStructFieldDecoder {
kind: VariablePackedStructFieldKind::Fixed {
bits_per_value: 32,
decompressor: fixed_decompressor,
},
},
VariablePackedStructFieldDecoder {
kind: VariablePackedStructFieldKind::Variable {
bits_per_length: 32,
decompressor: variable_decompressor,
},
},
]);

let mut row_data = Vec::new();
row_data.extend_from_slice(&1_u32.to_le_bytes());
row_data.extend_from_slice(&1_u32.to_le_bytes());
row_data.extend_from_slice(b"a");
row_data.extend_from_slice(&2_u32.to_le_bytes());
row_data.extend_from_slice(&0_u32.to_le_bytes());

let input = VariableWidthBlock {
data: LanceBuffer::from(row_data),
bits_per_offset: 32,
offsets: LanceBuffer::reinterpret_vec(vec![0_u32, 9_u32, 9_u32, 17_u32]),
num_values: 3,
block_info: BlockInfo::new(),
};

let decoded = decompressor.decompress(input)?;
let DataBlock::Struct(decoded_struct) = decoded else {
panic!("expected struct output");
};

let fixed = decoded_struct.children[0].as_fixed_width_ref().unwrap();
assert_eq!(fixed.bits_per_value, 32);
assert_eq!(
fixed.data.borrow_to_typed_slice::<u32>().as_ref(),
&[1, 0, 2]
);

let variable = decoded_struct.children[1].as_variable_width_ref().unwrap();
assert_eq!(variable.bits_per_offset, 32);
assert_eq!(
variable.offsets.borrow_to_typed_slice::<u32>().as_ref(),
&[0_u32, 1_u32, 1_u32, 1_u32]
);
assert_eq!(variable.data.as_ref(), b"a");

Ok(())
}
}