diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs index dcf8284fd0b..e2acb6b0c1c 100644 --- a/rust/arrow/src/array/array.rs +++ b/rust/arrow/src/array/array.rs @@ -323,6 +323,12 @@ pub fn make_array(data: ArrayDataRef) -> ArrayRef { } } +/// Creates a new empty array +pub fn new_empty_array(data_type: &DataType) -> ArrayRef { + let data = ArrayData::new_empty(data_type); + make_array(Arc::new(data)) +} + /// Creates a new array from two FFI pointers. Used to import arrays from the C Data Interface /// # Safety /// Assumes that these pointers represent valid C Data Interfaces, both in memory @@ -375,3 +381,34 @@ where } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_empty_primitive() { + let array = new_empty_array(&DataType::Int32); + let a = array.as_any().downcast_ref::().unwrap(); + assert_eq!(a.len(), 0); + let expected: &[i32] = &[]; + assert_eq!(a.values(), expected); + } + + #[test] + fn test_empty_variable_sized() { + let array = new_empty_array(&DataType::Utf8); + let a = array.as_any().downcast_ref::().unwrap(); + assert_eq!(a.len(), 0); + assert_eq!(a.value_offset(0), 0i32); + } + + #[test] + fn test_empty_list_primitive() { + let data_type = + DataType::List(Box::new(Field::new("item", DataType::Int32, false))); + let array = new_empty_array(&data_type); + let a = array.as_any().downcast_ref::().unwrap(); + assert_eq!(a.len(), 0); + assert_eq!(a.value_offset(0), 0i32); + } +} diff --git a/rust/arrow/src/array/array_list.rs b/rust/arrow/src/array/array_list.rs index 84e65e946d0..0f881a86f23 100644 --- a/rust/arrow/src/array/array_list.rs +++ b/rust/arrow/src/array/array_list.rs @@ -19,19 +19,15 @@ use std::any::Any; use std::convert::From; use std::fmt; use std::mem; -use std::sync::Arc; use num::Num; use super::{ array::print_long_array, make_array, raw_pointer::RawPtrBox, Array, ArrayDataRef, - ArrayRef, BinaryBuilder, BooleanBuilder, FixedSizeListBuilder, PrimitiveBuilder, - StringBuilder, + ArrayRef, }; -use crate::array::builder::GenericListBuilder; use crate::datatypes::ArrowNativeType; use crate::datatypes::*; -use crate::error::{ArrowError, Result}; /// trait declaring an offset size, relevant for i32 vs i64 array types. pub trait OffsetSizeTrait: ArrowNativeType + Num + Ord + std::ops::AddAssign { @@ -302,264 +298,6 @@ impl fmt::Debug for FixedSizeListArray { } } -macro_rules! build_empty_list_array_with_primitive_items { - ($item_type:ident, $offset_type:ident) => {{ - let values_builder = PrimitiveBuilder::<$item_type>::new(0); - let mut builder = - GenericListBuilder::<$offset_type, PrimitiveBuilder<$item_type>>::new( - values_builder, - ); - let empty_list_array = builder.finish(); - Ok(Arc::new(empty_list_array)) - }}; -} - -macro_rules! build_empty_list_array_with_non_primitive_items { - ($type_builder:ident, $offset_type:ident) => {{ - let values_builder = $type_builder::new(0); - let mut builder = - GenericListBuilder::<$offset_type, $type_builder>::new(values_builder); - let empty_list_array = builder.finish(); - Ok(Arc::new(empty_list_array)) - }}; -} - -pub fn build_empty_list_array( - item_type: DataType, -) -> Result { - match item_type { - DataType::UInt8 => { - build_empty_list_array_with_primitive_items!(UInt8Type, OffsetSize) - } - DataType::UInt16 => { - build_empty_list_array_with_primitive_items!(UInt16Type, OffsetSize) - } - DataType::UInt32 => { - build_empty_list_array_with_primitive_items!(UInt32Type, OffsetSize) - } - DataType::UInt64 => { - build_empty_list_array_with_primitive_items!(UInt64Type, OffsetSize) - } - DataType::Int8 => { - build_empty_list_array_with_primitive_items!(Int8Type, OffsetSize) - } - DataType::Int16 => { - build_empty_list_array_with_primitive_items!(Int16Type, OffsetSize) - } - DataType::Int32 => { - build_empty_list_array_with_primitive_items!(Int32Type, OffsetSize) - } - DataType::Int64 => { - build_empty_list_array_with_primitive_items!(Int64Type, OffsetSize) - } - DataType::Float32 => { - build_empty_list_array_with_primitive_items!(Float32Type, OffsetSize) - } - DataType::Float64 => { - build_empty_list_array_with_primitive_items!(Float64Type, OffsetSize) - } - DataType::Boolean => { - build_empty_list_array_with_non_primitive_items!(BooleanBuilder, OffsetSize) - } - DataType::Date32(_) => { - build_empty_list_array_with_primitive_items!(Date32Type, OffsetSize) - } - DataType::Date64(_) => { - build_empty_list_array_with_primitive_items!(Date64Type, OffsetSize) - } - DataType::Time32(TimeUnit::Second) => { - build_empty_list_array_with_primitive_items!(Time32SecondType, OffsetSize) - } - DataType::Time32(TimeUnit::Millisecond) => { - build_empty_list_array_with_primitive_items!( - Time32MillisecondType, - OffsetSize - ) - } - DataType::Time64(TimeUnit::Microsecond) => { - build_empty_list_array_with_primitive_items!( - Time64MicrosecondType, - OffsetSize - ) - } - DataType::Time64(TimeUnit::Nanosecond) => { - build_empty_list_array_with_primitive_items!(Time64NanosecondType, OffsetSize) - } - DataType::Duration(TimeUnit::Second) => { - build_empty_list_array_with_primitive_items!(DurationSecondType, OffsetSize) - } - DataType::Duration(TimeUnit::Millisecond) => { - build_empty_list_array_with_primitive_items!( - DurationMillisecondType, - OffsetSize - ) - } - DataType::Duration(TimeUnit::Microsecond) => { - build_empty_list_array_with_primitive_items!( - DurationMicrosecondType, - OffsetSize - ) - } - DataType::Duration(TimeUnit::Nanosecond) => { - build_empty_list_array_with_primitive_items!( - DurationNanosecondType, - OffsetSize - ) - } - DataType::Timestamp(TimeUnit::Second, _) => { - build_empty_list_array_with_primitive_items!(TimestampSecondType, OffsetSize) - } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - build_empty_list_array_with_primitive_items!( - TimestampMillisecondType, - OffsetSize - ) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - build_empty_list_array_with_primitive_items!( - TimestampMicrosecondType, - OffsetSize - ) - } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - build_empty_list_array_with_primitive_items!( - TimestampNanosecondType, - OffsetSize - ) - } - DataType::Utf8 => { - build_empty_list_array_with_non_primitive_items!(StringBuilder, OffsetSize) - } - DataType::Binary => { - build_empty_list_array_with_non_primitive_items!(BinaryBuilder, OffsetSize) - } - _ => Err(ArrowError::NotYetImplemented(format!( - "GenericListBuilder of type List({:?}) is not supported", - item_type - ))), - } -} - -macro_rules! build_empty_fixed_size_list_array_with_primitive_items { - ($item_type:ident) => {{ - let values_builder = PrimitiveBuilder::<$item_type>::new(0); - let mut builder = FixedSizeListBuilder::new(values_builder, 0); - let empty_list_array = builder.finish(); - Ok(Arc::new(empty_list_array)) - }}; -} - -macro_rules! build_empty_fixed_size_list_array_with_non_primitive_items { - ($type_builder:ident) => {{ - let values_builder = $type_builder::new(0); - let mut builder = FixedSizeListBuilder::new(values_builder, 0); - let empty_list_array = builder.finish(); - Ok(Arc::new(empty_list_array)) - }}; -} - -pub fn build_empty_fixed_size_list_array(item_type: DataType) -> Result { - match item_type { - DataType::UInt8 => { - build_empty_fixed_size_list_array_with_primitive_items!(UInt8Type) - } - DataType::UInt16 => { - build_empty_fixed_size_list_array_with_primitive_items!(UInt16Type) - } - DataType::UInt32 => { - build_empty_fixed_size_list_array_with_primitive_items!(UInt32Type) - } - DataType::UInt64 => { - build_empty_fixed_size_list_array_with_primitive_items!(UInt64Type) - } - DataType::Int8 => { - build_empty_fixed_size_list_array_with_primitive_items!(Int8Type) - } - DataType::Int16 => { - build_empty_fixed_size_list_array_with_primitive_items!(Int16Type) - } - DataType::Int32 => { - build_empty_fixed_size_list_array_with_primitive_items!(Int32Type) - } - DataType::Int64 => { - build_empty_fixed_size_list_array_with_primitive_items!(Int64Type) - } - DataType::Float32 => { - build_empty_fixed_size_list_array_with_primitive_items!(Float32Type) - } - DataType::Float64 => { - build_empty_fixed_size_list_array_with_primitive_items!(Float64Type) - } - DataType::Boolean => { - build_empty_fixed_size_list_array_with_non_primitive_items!(BooleanBuilder) - } - DataType::Date32(_) => { - build_empty_fixed_size_list_array_with_primitive_items!(Date32Type) - } - DataType::Date64(_) => { - build_empty_fixed_size_list_array_with_primitive_items!(Date64Type) - } - DataType::Time32(TimeUnit::Second) => { - build_empty_fixed_size_list_array_with_primitive_items!(Time32SecondType) - } - DataType::Time32(TimeUnit::Millisecond) => { - build_empty_fixed_size_list_array_with_primitive_items!(Time32MillisecondType) - } - DataType::Time64(TimeUnit::Microsecond) => { - build_empty_fixed_size_list_array_with_primitive_items!(Time64MicrosecondType) - } - DataType::Time64(TimeUnit::Nanosecond) => { - build_empty_fixed_size_list_array_with_primitive_items!(Time64NanosecondType) - } - DataType::Duration(TimeUnit::Second) => { - build_empty_fixed_size_list_array_with_primitive_items!(DurationSecondType) - } - DataType::Duration(TimeUnit::Millisecond) => { - build_empty_fixed_size_list_array_with_primitive_items!( - DurationMillisecondType - ) - } - DataType::Duration(TimeUnit::Microsecond) => { - build_empty_fixed_size_list_array_with_primitive_items!( - DurationMicrosecondType - ) - } - DataType::Duration(TimeUnit::Nanosecond) => { - build_empty_fixed_size_list_array_with_primitive_items!( - DurationNanosecondType - ) - } - DataType::Timestamp(TimeUnit::Second, _) => { - build_empty_fixed_size_list_array_with_primitive_items!(TimestampSecondType) - } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - build_empty_fixed_size_list_array_with_primitive_items!( - TimestampMillisecondType - ) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - build_empty_fixed_size_list_array_with_primitive_items!( - TimestampMicrosecondType - ) - } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - build_empty_fixed_size_list_array_with_primitive_items!( - TimestampNanosecondType - ) - } - DataType::Utf8 => { - build_empty_fixed_size_list_array_with_non_primitive_items!(StringBuilder) - } - DataType::Binary => { - build_empty_fixed_size_list_array_with_non_primitive_items!(BinaryBuilder) - } - _ => Err(ArrowError::NotYetImplemented(format!( - "FixedSizeListBuilder of type FixedSizeList({:?}) is not supported", - item_type - ))), - } -} - #[cfg(test)] mod tests { use crate::{ @@ -1066,37 +804,4 @@ mod tests { .build(); ListArray::from(list_data); } - - macro_rules! make_test_build_empty_list_array { - ($OFFSET:ident) => { - build_empty_list_array::<$OFFSET>(DataType::Boolean).unwrap(); - build_empty_list_array::<$OFFSET>(DataType::Int16).unwrap(); - build_empty_list_array::<$OFFSET>(DataType::Int32).unwrap(); - build_empty_list_array::<$OFFSET>(DataType::Int64).unwrap(); - build_empty_list_array::<$OFFSET>(DataType::Float32).unwrap(); - build_empty_list_array::<$OFFSET>(DataType::Float64).unwrap(); - build_empty_list_array::<$OFFSET>(DataType::Boolean).unwrap(); - build_empty_list_array::<$OFFSET>(DataType::Utf8).unwrap(); - build_empty_list_array::<$OFFSET>(DataType::Binary).unwrap(); - }; - } - - #[test] - fn test_build_empty_list_array() { - make_test_build_empty_list_array!(i32); - make_test_build_empty_list_array!(i64); - } - - #[test] - fn test_build_empty_fixed_size_list_array() { - build_empty_fixed_size_list_array(DataType::Boolean).unwrap(); - build_empty_fixed_size_list_array(DataType::Int16).unwrap(); - build_empty_fixed_size_list_array(DataType::Int32).unwrap(); - build_empty_fixed_size_list_array(DataType::Int64).unwrap(); - build_empty_fixed_size_list_array(DataType::Float32).unwrap(); - build_empty_fixed_size_list_array(DataType::Float64).unwrap(); - build_empty_fixed_size_list_array(DataType::Boolean).unwrap(); - build_empty_fixed_size_list_array(DataType::Utf8).unwrap(); - build_empty_fixed_size_list_array(DataType::Binary).unwrap(); - } } diff --git a/rust/arrow/src/array/data.rs b/rust/arrow/src/array/data.rs index 6541b8ff950..09fb019f314 100644 --- a/rust/arrow/src/array/data.rs +++ b/rust/arrow/src/array/data.rs @@ -21,9 +21,12 @@ use std::mem; use std::sync::Arc; -use crate::buffer::Buffer; -use crate::datatypes::DataType; +use crate::datatypes::{DataType, IntervalUnit}; use crate::{bitmap::Bitmap, datatypes::ArrowNativeType}; +use crate::{ + buffer::{Buffer, MutableBuffer}, + util::bit_util, +}; use super::equal::equal; @@ -41,6 +44,167 @@ pub(crate) fn count_nulls( } } +/// creates 2 [`MutableBuffer`]s with a given `capacity` (in slots). +#[inline] +pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuffer; 2] { + let empty_buffer = MutableBuffer::new(0); + match data_type { + DataType::Null => [empty_buffer, MutableBuffer::new(0)], + DataType::Boolean => { + let bytes = bit_util::ceil(capacity, 8); + let buffer = MutableBuffer::new(bytes); + [buffer, empty_buffer] + } + DataType::UInt8 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::UInt16 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::UInt32 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::UInt64 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Int8 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Int16 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Int32 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Int64 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Float32 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Float64 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Date32(_) | DataType::Time32(_) => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Date64(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Timestamp(_, _) => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Interval(IntervalUnit::YearMonth) => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Interval(IntervalUnit::DayTime) => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Utf8 | DataType::Binary => { + let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); + // safety: `unsafe` code assumes that this buffer is initialized with one element + buffer.push(0i32); + [buffer, MutableBuffer::new(capacity * mem::size_of::())] + } + DataType::LargeUtf8 | DataType::LargeBinary => { + let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); + // safety: `unsafe` code assumes that this buffer is initialized with one element + buffer.push(0i64); + [buffer, MutableBuffer::new(capacity * mem::size_of::())] + } + DataType::List(_) => { + // offset buffer always starts with a zero + let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); + buffer.push(0i32); + [buffer, empty_buffer] + } + DataType::LargeList(_) => { + // offset buffer always starts with a zero + let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); + buffer.push(0i64); + [buffer, empty_buffer] + } + DataType::FixedSizeBinary(size) => { + [MutableBuffer::new(capacity * *size as usize), empty_buffer] + } + DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() { + DataType::UInt8 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::UInt16 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::UInt32 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::UInt64 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Int8 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Int16 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Int32 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Int64 => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + _ => unreachable!(), + }, + DataType::Float16 => unreachable!(), + DataType::FixedSizeList(_, _) | DataType::Struct(_) => { + [empty_buffer, MutableBuffer::new(0)] + } + DataType::Decimal(_, _) => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], + DataType::Union(_) => unimplemented!(), + } +} + +/// Maps 2 [`MutableBuffer`]s into a vector of [Buffer]s whose size depends on `data_type`. +#[inline] +pub(crate) fn into_buffers( + data_type: &DataType, + buffer1: MutableBuffer, + buffer2: MutableBuffer, +) -> Vec { + match data_type { + DataType::Null | DataType::Struct(_) => vec![], + DataType::Utf8 + | DataType::Binary + | DataType::LargeUtf8 + | DataType::LargeBinary => vec![buffer1.into(), buffer2.into()], + _ => vec![buffer1.into()], + } +} + /// An generic representation of Arrow array data which encapsulates common attributes and /// operations for Arrow array. Specific operations for different arrays types (e.g., /// primitive, list, struct) are implemented in `Array`. @@ -246,6 +410,61 @@ impl ArrayData { assert_ne!(self.data_type, DataType::Boolean); &values.1[self.offset..] } + + /// Returns a new empty [ArrayData] valid for `data_type`. + pub(super) fn new_empty(data_type: &DataType) -> Self { + let buffers = new_buffers(data_type, 0); + let [buffer1, buffer2] = buffers; + let buffers = into_buffers(data_type, buffer1, buffer2); + + let child_data = match data_type { + DataType::Null + | DataType::Boolean + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Date32(_) + | DataType::Date64(_) + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Timestamp(_, _) + | DataType::Utf8 + | DataType::Binary + | DataType::LargeUtf8 + | DataType::LargeBinary + | DataType::Interval(_) + | DataType::FixedSizeBinary(_) + | DataType::Decimal(_, _) => vec![], + DataType::List(field) => { + vec![Arc::new(Self::new_empty(field.data_type()))] + } + DataType::FixedSizeList(field, _) => { + vec![Arc::new(Self::new_empty(field.data_type()))] + } + DataType::LargeList(field) => { + vec![Arc::new(Self::new_empty(field.data_type()))] + } + DataType::Struct(fields) => fields + .iter() + .map(|field| Arc::new(Self::new_empty(field.data_type()))) + .collect(), + DataType::Union(_) => unimplemented!(), + DataType::Dictionary(_, data_type) => { + vec![Arc::new(Self::new_empty(data_type))] + } + DataType::Float16 => unreachable!(), + }; + + Self::new(data_type.clone(), 0, Some(0), None, 0, buffers, child_data) + } } impl PartialEq for ArrayData { diff --git a/rust/arrow/src/array/mod.rs b/rust/arrow/src/array/mod.rs index 9caf7f8e257..a3d619fe572 100644 --- a/rust/arrow/src/array/mod.rs +++ b/rust/arrow/src/array/mod.rs @@ -119,8 +119,6 @@ pub use self::array_binary::FixedSizeBinaryArray; pub use self::array_binary::LargeBinaryArray; pub use self::array_boolean::BooleanArray; pub use self::array_dictionary::DictionaryArray; -pub use self::array_list::build_empty_fixed_size_list_array; -pub use self::array_list::build_empty_list_array; pub use self::array_list::FixedSizeListArray; pub use self::array_list::LargeListArray; pub use self::array_list::ListArray; @@ -132,6 +130,7 @@ pub use self::array_union::UnionArray; pub use self::null::NullArray; pub use self::array::make_array; +pub use self::array::new_empty_array; pub type Int8Array = PrimitiveArray; pub type Int16Array = PrimitiveArray; diff --git a/rust/arrow/src/array/transform/mod.rs b/rust/arrow/src/array/transform/mod.rs index 4a5e829d6e7..2505068f10f 100644 --- a/rust/arrow/src/array/transform/mod.rs +++ b/rust/arrow/src/array/transform/mod.rs @@ -15,11 +15,14 @@ // specific language governing permissions and limitations // under the License. -use std::{mem::size_of, sync::Arc}; +use std::sync::Arc; use crate::{buffer::MutableBuffer, datatypes::DataType, util::bit_util}; -use super::{ArrayData, ArrayDataRef}; +use super::{ + data::{into_buffers, new_buffers}, + ArrayData, ArrayDataRef, +}; mod boolean; mod fixed_binary; @@ -56,14 +59,7 @@ struct _MutableArrayData<'a> { impl<'a> _MutableArrayData<'a> { fn freeze(self, dictionary: Option) -> ArrayData { - let buffers = match self.data_type { - DataType::Null | DataType::Struct(_) => vec![], - DataType::Utf8 - | DataType::Binary - | DataType::LargeUtf8 - | DataType::LargeBinary => vec![self.buffer1.into(), self.buffer2.into()], - _ => vec![self.buffer1.into()], - }; + let buffers = into_buffers(&self.data_type, self.buffer1, self.buffer2); let child_data = match self.data_type { DataType::Dictionary(_, _) => vec![dictionary.unwrap()], @@ -293,137 +289,7 @@ impl<'a> MutableArrayData<'a> { use_nulls = true; }; - let empty_buffer = MutableBuffer::new(0); - let [buffer1, buffer2] = match &data_type { - DataType::Null => [empty_buffer, MutableBuffer::new(0)], - DataType::Boolean => { - let bytes = bit_util::ceil(capacity, 8); - let buffer = MutableBuffer::from_len_zeroed(bytes); - [buffer, empty_buffer] - } - DataType::UInt8 => { - [MutableBuffer::new(capacity * size_of::()), empty_buffer] - } - DataType::UInt16 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::UInt32 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::UInt64 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Int8 => { - [MutableBuffer::new(capacity * size_of::()), empty_buffer] - } - DataType::Int16 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Int32 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Int64 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Float32 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Float64 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Date32(_) | DataType::Time32(_) => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Date64(_) - | DataType::Time64(_) - | DataType::Duration(_) - | DataType::Timestamp(_, _) => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Interval(IntervalUnit::YearMonth) => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Interval(IntervalUnit::DayTime) => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Utf8 | DataType::Binary => { - let mut buffer = MutableBuffer::new((1 + capacity) * size_of::()); - // safety: `unsafe` code assumes that this buffer is initialized with one element - buffer.push(0i32); - [buffer, MutableBuffer::new(capacity * size_of::())] - } - DataType::LargeUtf8 | DataType::LargeBinary => { - let mut buffer = MutableBuffer::new((1 + capacity) * size_of::()); - // safety: `unsafe` code assumes that this buffer is initialized with one element - buffer.push(0i64); - [buffer, MutableBuffer::new(capacity * size_of::())] - } - DataType::List(_) => { - // offset buffer always starts with a zero - let mut buffer = MutableBuffer::new((1 + capacity) * size_of::()); - buffer.push(0i32); - [buffer, empty_buffer] - } - DataType::LargeList(_) => { - // offset buffer always starts with a zero - let mut buffer = MutableBuffer::new((1 + capacity) * size_of::()); - buffer.push(0i64); - [buffer, empty_buffer] - } - DataType::FixedSizeBinary(size) => { - [MutableBuffer::new(capacity * *size as usize), empty_buffer] - } - DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() { - DataType::UInt8 => { - [MutableBuffer::new(capacity * size_of::()), empty_buffer] - } - DataType::UInt16 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::UInt32 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::UInt64 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Int8 => { - [MutableBuffer::new(capacity * size_of::()), empty_buffer] - } - DataType::Int16 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Int32 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - DataType::Int64 => [ - MutableBuffer::new(capacity * size_of::()), - empty_buffer, - ], - _ => unreachable!(), - }, - DataType::Float16 => unreachable!(), - DataType::Struct(_) => [empty_buffer, MutableBuffer::new(0)], - _ => { - todo!("Take and filter operations still not supported for this datatype") - } - }; + let [buffer1, buffer2] = new_buffers(data_type, capacity); let child_data = match &data_type { DataType::Null diff --git a/rust/arrow/src/record_batch.rs b/rust/arrow/src/record_batch.rs index b8b6098a1c7..00ae4e83a53 100644 --- a/rust/arrow/src/record_batch.rs +++ b/rust/arrow/src/record_batch.rs @@ -93,6 +93,16 @@ impl RecordBatch { Ok(RecordBatch { schema, columns }) } + /// Creates a new empty [`RecordBatch`]. + pub fn new_empty(schema: SchemaRef) -> Self { + let columns = schema + .fields() + .iter() + .map(|field| new_empty_array(field.data_type())) + .collect(); + RecordBatch { schema, columns } + } + /// Validate the schema and columns using [`RecordBatchOptions`]. Returns an error /// if any validation check fails. fn validate_new_batch( diff --git a/rust/datafusion/src/physical_plan/common.rs b/rust/datafusion/src/physical_plan/common.rs index 60ca857e99b..9de7ee2a32d 100644 --- a/rust/datafusion/src/physical_plan/common.rs +++ b/rust/datafusion/src/physical_plan/common.rs @@ -25,27 +25,9 @@ use std::task::{Context, Poll}; use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; -use array::{ - ArrayData, BooleanArray, Date32Array, DecimalArray, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray, - Time32MillisecondArray, Time32SecondArray, UInt16Array, UInt32Array, UInt64Array, - UInt8Array, -}; +use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use arrow::{ - array::{self, ArrayRef}, - datatypes::Schema, -}; -use arrow::{ - array::{ - build_empty_fixed_size_list_array, build_empty_list_array, Date64Array, - Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, - }, - buffer::Buffer, - datatypes::{DataType, SchemaRef, TimeUnit}, -}; use futures::{Stream, TryStreamExt}; /// Stream of record batches @@ -120,175 +102,3 @@ pub fn build_file_list(dir: &str, filenames: &mut Vec, ext: &str) -> Res } Ok(()) } - -/// creates an empty record batch. -pub fn create_batch_empty(schema: &Schema) -> ArrowResult { - let columns = schema - .fields() - .iter() - .map(|f| match f.data_type() { - DataType::Float32 => { - Ok(Arc::new(Float32Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::Float64 => { - Ok(Arc::new(Float64Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::Int64 => { - Ok(Arc::new(Int64Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::Int32 => { - Ok(Arc::new(Int32Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::Int16 => { - Ok(Arc::new(Int16Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::Int8 => { - Ok(Arc::new(Int8Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::UInt64 => { - Ok(Arc::new(UInt64Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::UInt32 => { - Ok(Arc::new(UInt32Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::UInt16 => { - Ok(Arc::new(UInt16Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::UInt8 => { - Ok(Arc::new(UInt8Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::Utf8 => { - Ok(Arc::new(StringArray::from(vec![] as Vec<&str>)) as ArrayRef) - } - DataType::LargeUtf8 => { - Ok(Arc::new(LargeStringArray::from(vec![] as Vec<&str>)) as ArrayRef) - } - DataType::Boolean => { - Ok(Arc::new(BooleanArray::from(vec![] as Vec)) as ArrayRef) - } - DataType::Decimal(scale, precision) => { - let array_data = - ArrayData::builder(DataType::Decimal(*scale, *precision)) - .len(0) - .add_buffer(Buffer::from(&[])) - .build(); - - Ok(Arc::new(DecimalArray::from(array_data)) as ArrayRef) - } - DataType::Timestamp(TimeUnit::Nanosecond, tz) => Ok(Arc::new( - TimestampNanosecondArray::from_vec(vec![] as Vec, tz.clone()), - ) - as ArrayRef), - DataType::Timestamp(TimeUnit::Microsecond, tz) => Ok(Arc::new( - TimestampMicrosecondArray::from_vec(vec![] as Vec, tz.clone()), - ) - as ArrayRef), - DataType::Timestamp(TimeUnit::Millisecond, tz) => Ok(Arc::new( - TimestampMillisecondArray::from_vec(vec![] as Vec, tz.clone()), - ) - as ArrayRef), - DataType::Timestamp(TimeUnit::Second, tz) => Ok(Arc::new( - TimestampSecondArray::from_vec(vec![] as Vec, tz.clone()), - ) as ArrayRef), - DataType::Date32(_) => { - Ok(Arc::new(Date32Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::Date64(_) => { - Ok(Arc::new(Date64Array::from(vec![] as Vec)) as ArrayRef) - } - DataType::Time32(unit) => match unit { - TimeUnit::Second => { - Ok(Arc::new(Time32SecondArray::from(vec![] as Vec)) as ArrayRef) - } - TimeUnit::Millisecond => { - Ok(Arc::new(Time32MillisecondArray::from(vec![] as Vec)) - as ArrayRef) - } - TimeUnit::Microsecond | TimeUnit::Nanosecond => { - Err(DataFusionError::NotImplemented(format!( - "Cannot convert datatype {:?} to array", - f.data_type() - ))) - } - }, - DataType::Time64(unit) => match unit { - TimeUnit::Second | TimeUnit::Millisecond => { - Err(DataFusionError::NotImplemented(format!( - "Cannot convert datatype {:?} to array", - f.data_type() - ))) - } - TimeUnit::Microsecond => { - Ok(Arc::new(Time64MicrosecondArray::from(vec![] as Vec)) - as ArrayRef) - } - TimeUnit::Nanosecond => { - Ok(Arc::new(Time64NanosecondArray::from(vec![] as Vec)) - as ArrayRef) - } - }, - DataType::List(nested_type) => Ok(build_empty_list_array::( - nested_type.data_type().clone(), - )?), - DataType::LargeList(nested_type) => Ok(build_empty_list_array::( - nested_type.data_type().clone(), - )?), - DataType::FixedSizeList(nested_type, _) => Ok( - build_empty_fixed_size_list_array(nested_type.data_type().clone())?, - ), - _ => Err(DataFusionError::NotImplemented(format!( - "Cannot convert datatype {:?} to array", - f.data_type() - ))), - }) - .collect::>() - .map_err(DataFusionError::into_arrow_external_error)?; - - RecordBatch::try_new(Arc::new(schema.to_owned()), columns) -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow::datatypes::Field; - - #[test] - fn test_create_batch_empty() { - let schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), - Field::new("c2", DataType::UInt32, false), - Field::new("c3", DataType::Int8, false), - Field::new("c4", DataType::Int16, false), - Field::new("c5", DataType::Int32, false), - Field::new("c6", DataType::Int64, false), - Field::new("c7", DataType::UInt8, false), - Field::new("c8", DataType::UInt16, false), - Field::new("c9", DataType::UInt32, false), - Field::new("c10", DataType::UInt64, false), - Field::new("c11", DataType::Float32, false), - Field::new("c12", DataType::Float64, false), - Field::new("c13", DataType::Utf8, false), - Field::new("c14", DataType::Decimal(10, 10), false), - Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false), - Field::new( - "c16", - DataType::Timestamp(TimeUnit::Microsecond, None), - false, - ), - Field::new( - "c17", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - Field::new( - "c18", - DataType::Timestamp(TimeUnit::Nanosecond, None), - false, - ), - Field::new("c19", DataType::Boolean, false), - ]); - - let batch = create_batch_empty(&schema).unwrap(); - assert_eq!(batch.columns().len(), 19); - } -} diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index de5c425b94e..c781bb67898 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -43,7 +43,7 @@ use arrow::{ use pin_project_lite::pin_project; use super::{ - common, expressions::Column, group_scalar::GroupByScalar, RecordBatchStream, + expressions::Column, group_scalar::GroupByScalar, RecordBatchStream, SendableRecordBatchStream, }; use ahash::RandomState; @@ -824,7 +824,7 @@ fn create_batch_from_map( let columns = concatenate(arrays)?; RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)? } else { - common::create_batch_empty(output_schema)? + RecordBatch::new_empty(Arc::new(output_schema.to_owned())) }; Ok(batch) } diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 4bacb504424..a651badd7d6 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::vec::Vec; use arrow::array::{ - build_empty_list_array, Array, ArrayData, ArrayDataBuilder, ArrayDataRef, ArrayRef, + new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayDataRef, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, Int32Array, Int64Array, OffsetSizeTrait, PrimitiveArray, @@ -798,8 +798,7 @@ impl ArrayReader for ListArrayReader { let item_type = self.item_reader.get_data_type().clone(); if next_batch_array.len() == 0 { - return build_empty_list_array::(item_type) - .map_err(|err| ParquetError::General(err.to_string())); + return Ok(new_empty_array(&self.data_type)); } let def_levels = self .item_reader