diff --git a/rust/lance-encoding/src/encodings/physical/packed.rs b/rust/lance-encoding/src/encodings/physical/packed.rs index 31ce7acd1e4..0ad7295011f 100644 --- a/rust/lance-encoding/src/encodings/physical/packed.rs +++ b/rust/lance-encoding/src/encodings/physical/packed.rs @@ -439,15 +439,41 @@ enum FieldAccumulator { Fixed { builder: DataBlockBuilder, bits_per_value: u64, + empty_value: DataBlock, }, Variable32 { builder: DataBlockBuilder, + empty_value: DataBlock, }, Variable64 { builder: DataBlockBuilder, + empty_value: DataBlock, }, } +impl FieldAccumulator { + // In full-zip variable packed decoding, rep/def may produce a visible row + // with an empty payload (e.g. null/invalid item). We still need to append + // one placeholder per child so child row counts remain aligned. + fn append_empty(&mut self) { + match self { + Self::Fixed { + builder, + empty_value, + .. + } => builder.append(empty_value, 0..1), + Self::Variable32 { + builder, + empty_value, + } => builder.append(empty_value, 0..1), + Self::Variable64 { + builder, + empty_value, + } => builder.append(empty_value, 0..1), + } + } +} + impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor { fn decompress(&self, data: VariableWidthBlock) -> Result { let num_values = data.num_values; @@ -500,9 +526,16 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor { location!(), ) })?; + let empty_value = DataBlock::FixedWidth(FixedWidthDataBlock { + data: LanceBuffer::from(vec![0_u8; bytes_per_value as usize]), + bits_per_value: *bits_per_value, + num_values: 1, + block_info: BlockInfo::new(), + }); accumulators.push(FieldAccumulator::Fixed { builder: DataBlockBuilder::with_capacity_estimate(estimate), bits_per_value: *bits_per_value, + empty_value, }); } VariablePackedStructFieldKind::Variable { @@ -510,9 +543,23 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor { } => match bits_per_length { 32 => accumulators.push(FieldAccumulator::Variable32 { builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64), + empty_value: DataBlock::VariableWidth(VariableWidthBlock { + data: LanceBuffer::empty(), + bits_per_offset: 32, + offsets: LanceBuffer::reinterpret_vec(vec![0_u32, 0_u32]), + num_values: 1, + block_info: BlockInfo::new(), + }), }), 64 => accumulators.push(FieldAccumulator::Variable64 { builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64), + empty_value: DataBlock::VariableWidth(VariableWidthBlock { + data: LanceBuffer::empty(), + bits_per_offset: 64, + offsets: LanceBuffer::reinterpret_vec(vec![0_u64, 0_u64]), + num_values: 1, + block_info: BlockInfo::new(), + }), }), _ => { return Err(Error::invalid_input( @@ -533,6 +580,12 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor { location!(), )); } + if row_start == row_end { + for accumulator in accumulators.iter_mut() { + accumulator.append_empty(); + } + continue; + } let mut cursor = row_start; for (field, accumulator) in self.fields.iter().zip(accumulators.iter_mut()) { match (&field.kind, accumulator) { @@ -541,6 +594,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor { FieldAccumulator::Fixed { builder, bits_per_value: acc_bits, + .. }, ) => { debug_assert_eq!(bits_per_value, acc_bits); @@ -565,7 +619,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor { VariablePackedStructFieldKind::Variable { bits_per_length, .. }, - FieldAccumulator::Variable32 { builder }, + FieldAccumulator::Variable32 { builder, .. }, ) => { if *bits_per_length != 32 { return Err(Error::invalid_input( @@ -607,7 +661,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor { VariablePackedStructFieldKind::Variable { bits_per_length, .. }, - FieldAccumulator::Variable64 { builder }, + FieldAccumulator::Variable64 { builder, .. }, ) => { if *bits_per_length != 64 { return Err(Error::invalid_input( @@ -684,7 +738,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor { decompressor, }, }, - FieldAccumulator::Variable32 { builder }, + FieldAccumulator::Variable32 { builder, .. }, ) => { let DataBlock::VariableWidth(mut block) = builder.finish() else { panic!("Expected variable-width datablock from builder"); @@ -702,7 +756,7 @@ impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor { decompressor, }, }, - FieldAccumulator::Variable64 { builder }, + FieldAccumulator::Variable64 { builder, .. }, ) => { let DataBlock::VariableWidth(mut block) = builder.finish() else { panic!("Expected variable-width datablock from builder"); @@ -1116,4 +1170,73 @@ mod tests { assert!(matches!(result, Err(Error::NotSupported { .. }))); } + + #[test] + fn variable_packed_struct_decompress_empty_row() -> Result<()> { + let strategy = DefaultDecompressionStrategy::default(); + let fixed_decompressor = Arc::from( + crate::compression::DecompressionStrategy::create_fixed_per_value_decompressor( + &strategy, + &ProtobufUtils21::flat(32, None), + )?, + ); + let variable_decompressor = Arc::from( + crate::compression::DecompressionStrategy::create_variable_per_value_decompressor( + &strategy, + &ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None), + )?, + ); + + let decompressor = PackedStructVariablePerValueDecompressor::new(vec![ + VariablePackedStructFieldDecoder { + kind: VariablePackedStructFieldKind::Fixed { + bits_per_value: 32, + decompressor: fixed_decompressor, + }, + }, + VariablePackedStructFieldDecoder { + kind: VariablePackedStructFieldKind::Variable { + bits_per_length: 32, + decompressor: variable_decompressor, + }, + }, + ]); + + let mut row_data = Vec::new(); + row_data.extend_from_slice(&1_u32.to_le_bytes()); + row_data.extend_from_slice(&1_u32.to_le_bytes()); + row_data.extend_from_slice(b"a"); + row_data.extend_from_slice(&2_u32.to_le_bytes()); + row_data.extend_from_slice(&0_u32.to_le_bytes()); + + let input = VariableWidthBlock { + data: LanceBuffer::from(row_data), + bits_per_offset: 32, + offsets: LanceBuffer::reinterpret_vec(vec![0_u32, 9_u32, 9_u32, 17_u32]), + num_values: 3, + block_info: BlockInfo::new(), + }; + + let decoded = decompressor.decompress(input)?; + let DataBlock::Struct(decoded_struct) = decoded else { + panic!("expected struct output"); + }; + + let fixed = decoded_struct.children[0].as_fixed_width_ref().unwrap(); + assert_eq!(fixed.bits_per_value, 32); + assert_eq!( + fixed.data.borrow_to_typed_slice::().as_ref(), + &[1, 0, 2] + ); + + let variable = decoded_struct.children[1].as_variable_width_ref().unwrap(); + assert_eq!(variable.bits_per_offset, 32); + assert_eq!( + variable.offsets.borrow_to_typed_slice::().as_ref(), + &[0_u32, 1_u32, 1_u32, 1_u32] + ); + assert_eq!(variable.data.as_ref(), b"a"); + + Ok(()) + } }