From 44c8f5d318a14187937e63d925460d42fb4d94ff Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 20 Dec 2025 06:23:19 -0500 Subject: [PATCH] Fix panic after spill to disk --- .../group_values/multi_group_by/bytes_view.rs | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index 31a152aa74174..845798aa04a4a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -451,21 +451,23 @@ impl ByteViewGroupValueBuilder { last_take_len: usize, ) -> Vec { let mut take_buffers = Vec::with_capacity(last_remaining_buffer_index + 1); + debug_assert!(last_remaining_buffer_index <= self.completed.len()); - // Take `0 ~ last_remaining_buffer_index - 1` buffers - if !self.completed.is_empty() || last_remaining_buffer_index == 0 { - take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index)); - } - - // Process the `last_remaining_buffer_index` buffers + // Process the `last_remaining_buffer_index` buffer before draining so the index is valid. let last_buffer = if last_remaining_buffer_index < self.completed.len() { // If it is in `completed`, simply clone self.completed[last_remaining_buffer_index].clone() } else { // If it is `in_progress`, copied `0 ~ offset` part + debug_assert!(last_take_len <= self.in_progress.len()); let taken_last_buffer = self.in_progress[0..last_take_len].to_vec(); Buffer::from_vec(taken_last_buffer) }; + + // Take `0 ~ last_remaining_buffer_index - 1` buffers + if last_remaining_buffer_index > 0 { + take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index)); + } take_buffers.push(last_buffer); take_buffers @@ -913,4 +915,28 @@ mod tests { let taken_array = builder.take_n(final_ones_to_append); assert_eq!(&taken_array, &input_array); } + + #[test] + fn test_byte_view_take_n_partial_completed_nonzero_index() { + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(30); + let input_array = StringViewArray::from(vec![ + Some("aaaaaaaaaaaaaa"), + Some("bbbbbbbbbbbbbb"), + Some("cccccccccccccc"), + Some("dddddddddddddd"), + Some("eeeeeeeeeeeeee"), + ]); + let input_array: ArrayRef = Arc::new(input_array); + + for row in 0..input_array.len() { + builder.append_val(&input_array, row).unwrap(); + } + + assert_eq!(builder.completed.len(), 2); + assert_eq!(builder.in_progress.len(), 14); + + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(0, 3)); + } }