diff --git a/rust/arrow-pyarrow-integration-testing/src/lib.rs b/rust/arrow-pyarrow-integration-testing/src/lib.rs index 951d52983f1..5b5462d9c15 100644 --- a/rust/arrow-pyarrow-integration-testing/src/lib.rs +++ b/rust/arrow-pyarrow-integration-testing/src/lib.rs @@ -167,11 +167,22 @@ fn concatenate(array: PyObject, py: Python) -> PyResult { to_py(array, py) } +/// Converts to rust and back to python +#[pyfunction] +fn round_trip(array: PyObject, py: Python) -> PyResult { + // import + let array = to_rust(array, py)?; + + // export + to_py(array, py) +} + #[pymodule] fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(double))?; m.add_wrapped(wrap_pyfunction!(double_py))?; m.add_wrapped(wrap_pyfunction!(substring))?; m.add_wrapped(wrap_pyfunction!(concatenate))?; + m.add_wrapped(wrap_pyfunction!(round_trip))?; Ok(()) } diff --git a/rust/arrow-pyarrow-integration-testing/tests/test_sql.py b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py index c81ac4c635f..c0de382057c 100644 --- a/rust/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -78,3 +78,22 @@ def test_time32_python(self): del expected # No leak of C++ memory self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) + + def test_list_array(self): + """ + Python -> Rust -> Python + """ + old_allocated = pyarrow.total_allocated_bytes() + a = pyarrow.array([[], None, [1, 2], [4, 5, 6]], pyarrow.list_(pyarrow.int64())) + b = arrow_pyarrow_integration_testing.round_trip(a) + + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() + assert a.type == b.type + del a + del b + # No leak of C++ memory + self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) + + + diff --git a/rust/arrow/src/array/ffi.rs b/rust/arrow/src/array/ffi.rs index a9b5d29ed79..450685bf522 100644 --- a/rust/arrow/src/array/ffi.rs +++ b/rust/arrow/src/array/ffi.rs @@ -25,12 +25,23 @@ use crate::{ }; use super::ArrayData; +use crate::datatypes::DataType; +use crate::ffi::ArrowArray; impl TryFrom for ArrayData { type Error = ArrowError; fn try_from(value: ffi::ArrowArray) -> Result { - let data_type = value.data_type()?; + let child_data = value.children()?; + + let child_type = if !child_data.is_empty() { + Some(child_data[0].data_type().clone()) + } else { + None + }; + + let data_type = value.data_type(child_type)?; + let len = value.len(); let offset = value.offset(); let null_count = value.null_count(); @@ -44,9 +55,7 @@ impl TryFrom for ArrayData { null_bit_buffer, offset, buffers, - // this is empty because ffi still does not support it. - // this is ok because FFI only supports datatypes without childs - vec![], + child_data, )) } } @@ -55,11 +64,45 @@ impl TryFrom for ffi::ArrowArray { type Error = ArrowError; fn try_from(value: ArrayData) -> Result { + // If parent is nullable, then children also must be nullable + // so we pass this nullable to the creation of hte child data + let nullable = match value.data_type() { + DataType::List(field) => field.is_nullable(), + DataType::LargeList(field) => field.is_nullable(), + _ => false, + }; + let len = value.len(); let offset = value.offset() as usize; let null_count = value.null_count(); let buffers = value.buffers().to_vec(); let null_buffer = value.null_buffer().cloned(); + let child_data = value + .child_data() + .iter() + .map(|arr| { + let len = arr.len(); + let offset = arr.offset() as usize; + let null_count = arr.null_count(); + let buffers = arr.buffers().to_vec(); + let null_buffer = arr.null_buffer().cloned(); + + // Note: the nullable comes from the parent data. + unsafe { + ArrowArray::try_new( + arr.data_type(), + len, + null_count, + null_buffer, + offset, + buffers, + vec![], + nullable, + ) + .expect("infallible") + } + }) + .collect::>(); unsafe { ffi::ArrowArray::try_new( @@ -69,9 +112,8 @@ impl TryFrom for ffi::ArrowArray { null_buffer, offset, buffers, - // this is empty because ffi still does not support it. - // this is ok because FFI only supports datatypes without childs - vec![], + child_data, + nullable, ) } } diff --git a/rust/arrow/src/datatypes/field.rs b/rust/arrow/src/datatypes/field.rs index 11fc31d6343..a471f12ef95 100644 --- a/rust/arrow/src/datatypes/field.rs +++ b/rust/arrow/src/datatypes/field.rs @@ -309,8 +309,8 @@ impl Field { }; Ok(Field { name, - nullable, data_type, + nullable, dict_id, dict_is_ordered, metadata, diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 6df5690fd41..3a6d031ebd8 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -77,16 +77,19 @@ To export an array, create an `ArrowArray` using [ArrowArray::try_new]. */ use std::{ + convert::TryFrom, ffi::CStr, ffi::CString, iter, - mem::size_of, + mem::{size_of, ManuallyDrop}, + os::raw::c_char, ptr::{self, NonNull}, sync::Arc, }; +use crate::array::ArrayData; use crate::buffer::Buffer; -use crate::datatypes::{DataType, TimeUnit}; +use crate::datatypes::{DataType, Field, TimeUnit}; use crate::error::{ArrowError, Result}; use crate::util::bit_util; @@ -117,20 +120,36 @@ unsafe extern "C" fn release_schema(schema: *mut FFI_ArrowSchema) { schema.release = None; } +struct SchemaPrivateData { + children: Box<[*mut FFI_ArrowSchema]>, +} + impl FFI_ArrowSchema { /// create a new [FFI_ArrowSchema] from a format. - fn new(format: &str) -> FFI_ArrowSchema { + fn new( + format: &str, + children: Vec<*mut FFI_ArrowSchema>, + nullable: bool, + ) -> FFI_ArrowSchema { + let children = children.into_boxed_slice(); + let n_children = children.len() as i64; + let children_ptr = children.as_ptr() as *mut *mut FFI_ArrowSchema; + + let flags = if nullable { 2 } else { 0 }; + + let private_data = Box::new(SchemaPrivateData { children }); // FFI_ArrowSchema { format: CString::new(format).unwrap().into_raw(), - name: std::ptr::null_mut(), + // For child data a non null string is expected and is called item + name: CString::new("item").unwrap().into_raw(), metadata: std::ptr::null_mut(), - flags: 0, - n_children: 0, - children: ptr::null_mut(), + flags, + n_children, + children: children_ptr, dictionary: std::ptr::null_mut(), release: Some(release_schema), - private_data: std::ptr::null_mut(), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, } } @@ -168,7 +187,11 @@ impl Drop for FFI_ArrowSchema { /// maps a DataType `format` to a [DataType](arrow::datatypes::DataType). /// See https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings -fn to_datatype(format: &str) -> Result { +fn to_datatype( + format: &str, + child_type: Option, + schema: &FFI_ArrowSchema, +) -> Result { Ok(match format { "n" => DataType::Null, "b" => DataType::Boolean, @@ -193,6 +216,42 @@ fn to_datatype(format: &str) -> Result { "ttm" => DataType::Time32(TimeUnit::Millisecond), "ttu" => DataType::Time64(TimeUnit::Microsecond), "ttn" => DataType::Time64(TimeUnit::Nanosecond), + + // Note: The datatype null will only be created when called from ArrowArray::buffer_len + // at that point the child data is not yet known, but it is also not required to determine + // the buffer length of the list arrays. + "+l" => { + let nullable = schema.flags == 2; + // Safety + // Should be set as this is expected from the C FFI definition + debug_assert!(!schema.name.is_null()); + let name = unsafe { CString::from_raw(schema.name as *mut c_char) } + .into_string() + .unwrap(); + // prevent a double free + let name = ManuallyDrop::new(name); + DataType::List(Box::new(Field::new( + &name, + child_type.unwrap_or(DataType::Null), + nullable, + ))) + } + "+L" => { + let nullable = schema.flags == 2; + // Safety + // Should be set as this is expected from the C FFI definition + debug_assert!(!schema.name.is_null()); + let name = unsafe { CString::from_raw(schema.name as *mut c_char) } + .into_string() + .unwrap(); + // prevent a double free + let name = ManuallyDrop::new(name); + DataType::LargeList(Box::new(Field::new( + &name, + child_type.unwrap_or(DataType::Null), + nullable, + ))) + } dt => { return Err(ArrowError::CDataInterface(format!( "The datatype \"{}\" is not supported in the Rust implementation", @@ -228,6 +287,8 @@ fn from_datatype(datatype: &DataType) -> Result { DataType::Time32(TimeUnit::Millisecond) => "ttm", DataType::Time64(TimeUnit::Microsecond) => "ttu", DataType::Time64(TimeUnit::Nanosecond) => "ttn", + DataType::List(_) => "+l", + DataType::LargeList(_) => "+L", z => { return Err(ArrowError::CDataInterface(format!( "The datatype \"{:?}\" is still not supported in Rust implementation", @@ -275,9 +336,9 @@ fn bit_width(data_type: &DataType, i: usize) -> Result { } // Variable-sized binaries: have two buffers. // "small": first buffer is i32, second is in bytes - (DataType::Utf8, 1) | (DataType::Binary, 1) => size_of::() * 8, - (DataType::Utf8, 2) | (DataType::Binary, 2) => size_of::() * 8, - (DataType::Utf8, _) | (DataType::Binary, _) => { + (DataType::Utf8, 1) | (DataType::Binary, 1) | (DataType::List(_), 1) => size_of::() * 8, + (DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => size_of::() * 8, + (DataType::Utf8, _) | (DataType::Binary, _) | (DataType::List(_), _)=> { return Err(ArrowError::CDataInterface(format!( "The datatype \"{:?}\" expects 3 buffers, but requested {}. Please verify that the C data interface is correctly implemented.", data_type, i @@ -285,9 +346,9 @@ fn bit_width(data_type: &DataType, i: usize) -> Result { } // Variable-sized binaries: have two buffers. // LargeUtf8: first buffer is i64, second is in bytes - (DataType::LargeUtf8, 1) | (DataType::LargeBinary, 1) => size_of::() * 8, - (DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) => size_of::() * 8, - (DataType::LargeUtf8, _) | (DataType::LargeBinary, _) => { + (DataType::LargeUtf8, 1) | (DataType::LargeBinary, 1) | (DataType::LargeList(_), 1) => size_of::() * 8, + (DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) | (DataType::LargeList(_), 2)=> size_of::() * 8, + (DataType::LargeUtf8, _) | (DataType::LargeBinary, _) | (DataType::LargeList(_), _)=> { return Err(ArrowError::CDataInterface(format!( "The datatype \"{:?}\" expects 3 buffers, but requested {}. Please verify that the C data interface is correctly implemented.", data_type, i @@ -340,6 +401,7 @@ unsafe extern "C" fn release_array(array: *mut FFI_ArrowArray) { struct PrivateData { buffers: Vec>, buffers_ptr: Box<[*const std::os::raw::c_void]>, + children: Box<[*mut FFI_ArrowArray]>, } impl FFI_ArrowArray { @@ -353,6 +415,7 @@ impl FFI_ArrowArray { offset: i64, n_buffers: i64, buffers: Vec>, + children: Vec<*mut FFI_ArrowArray>, ) -> Self { let buffers_ptr = buffers .iter() @@ -364,11 +427,16 @@ impl FFI_ArrowArray { .collect::>(); let pointer = buffers_ptr.as_ptr() as *mut *const std::ffi::c_void; + let children = children.into_boxed_slice(); + let children_ptr = children.as_ptr() as *mut *mut FFI_ArrowArray; + let n_children = children.len() as i64; + // create the private data owning everything. // any other data must be added here, e.g. via a struct, to track lifetime. let private_data = Box::new(PrivateData { buffers, buffers_ptr, + children, }); Self { @@ -376,9 +444,9 @@ impl FFI_ArrowArray { null_count, offset, n_buffers, - n_children: 0, + n_children, buffers: pointer, - children: std::ptr::null_mut(), + children: children_ptr, dictionary: std::ptr::null_mut(), release: Some(release_array), private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, @@ -425,6 +493,23 @@ unsafe fn create_buffer( NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len, array)) } +unsafe fn create_child_arrays( + array: Arc, + schema: Arc, +) -> Result> { + (0..array.n_children as usize) + .map(|i| { + let arr_ptr = *array.children.add(i); + let schema_ptr = *schema.children.add(i); + let arrow_arr = ArrowArray::try_from_raw( + arr_ptr as *const FFI_ArrowArray, + schema_ptr as *const FFI_ArrowSchema, + )?; + ArrayData::try_from(arrow_arr) + }) + .collect() +} + impl Drop for FFI_ArrowArray { fn drop(&mut self) { match self.release { @@ -464,6 +549,7 @@ impl ArrowArray { /// creates a new `ArrowArray`. This is used to export to the C Data Interface. /// # Safety /// See safety of [ArrowArray] + #[allow(clippy::too_many_arguments)] pub unsafe fn try_new( data_type: &DataType, len: usize, @@ -471,7 +557,8 @@ impl ArrowArray { null_buffer: Option, offset: usize, buffers: Vec, - _child_data: Vec, + child_data: Vec, + nullable: bool, ) -> Result { let format = from_datatype(data_type)?; // * insert the null buffer at the start @@ -480,16 +567,26 @@ impl ArrowArray { .chain(buffers.iter().map(|b| Some(b.clone()))) .collect::>(); - let schema = Arc::new(FFI_ArrowSchema::new(&format)); + let mut ffi_arrow_arrays = Vec::with_capacity(child_data.len()); + let mut ffi_arrow_schemas = Vec::with_capacity(child_data.len()); + + child_data.into_iter().for_each(|arrow_arr| { + let (arr, schema) = ArrowArray::into_raw(arrow_arr); + ffi_arrow_arrays.push(arr as *mut FFI_ArrowArray); + ffi_arrow_schemas.push(schema as *mut FFI_ArrowSchema); + }); + + let schema = Arc::new(FFI_ArrowSchema::new(&format, ffi_arrow_schemas, nullable)); let array = Arc::new(FFI_ArrowArray::new( len as i64, null_count as i64, offset as i64, new_buffers.len() as i64, new_buffers, + ffi_arrow_arrays, )); - Ok(ArrowArray { schema, array }) + Ok(ArrowArray { array, schema }) } /// creates a new [ArrowArray] from two pointers. Used to import from the C Data Interface. @@ -519,7 +616,7 @@ impl ArrowArray { pub unsafe fn empty() -> Self { let schema = Arc::new(FFI_ArrowSchema::empty()); let array = Arc::new(FFI_ArrowArray::empty()); - ArrowArray { schema, array } + ArrowArray { array, schema } } /// exports [ArrowArray] to the C Data Interface @@ -542,19 +639,22 @@ impl ArrowArray { // for variable-sized buffers, such as the second buffer of a stringArray, we need // to fetch offset buffer's len to build the second buffer. fn buffer_len(&self, i: usize) -> Result { - let data_type = &self.data_type()?; + // Inner type is not important for buffer length. + let data_type = &self.data_type(None)?; Ok(match (data_type, i) { (DataType::Utf8, 1) | (DataType::LargeUtf8, 1) | (DataType::Binary, 1) - | (DataType::LargeBinary, 1) => { + | (DataType::LargeBinary, 1) + | (DataType::List(_), 1) + | (DataType::LargeList(_), 1) => { // the len of the offset buffer (buffer 1) equals length + 1 let bits = bit_width(data_type, i)?; debug_assert_eq!(bits % 8, 0); (self.array.length as usize + 1) * (bits / 8) } - (DataType::Utf8, 2) | (DataType::Binary, 2) => { + (DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = self.buffer_len(1)?; // first buffer is the null buffer => add(1) @@ -566,7 +666,9 @@ impl ArrowArray { // get last offset (unsafe { *offset_buffer.add(len / size_of::() - 1) }) as usize } - (DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) => { + (DataType::LargeUtf8, 2) + | (DataType::LargeBinary, 2) + | (DataType::LargeList(_), 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = self.buffer_len(1)?; // first buffer is the null buffer => add(1) @@ -607,6 +709,11 @@ impl ArrowArray { .collect() } + /// returns the child data of this array + pub fn children(&self) -> Result> { + unsafe { create_child_arrays(self.array.clone(), self.schema.clone()) } + } + /// the length of the array pub fn len(&self) -> usize { self.array.length as usize @@ -628,8 +735,8 @@ impl ArrowArray { } /// the data_type as declared in the schema - pub fn data_type(&self) -> Result { - to_datatype(self.schema.format()) + pub fn data_type(&self, child_type: Option) -> Result { + to_datatype(self.schema.format(), child_type, self.schema.as_ref()) } } @@ -638,11 +745,13 @@ mod tests { use super::*; use crate::array::{ make_array, Array, ArrayData, BinaryOffsetSizeTrait, BooleanArray, - GenericBinaryArray, GenericStringArray, Int32Array, StringOffsetSizeTrait, - Time32MillisecondArray, + GenericBinaryArray, GenericListArray, GenericStringArray, Int32Array, + OffsetSizeTrait, StringOffsetSizeTrait, Time32MillisecondArray, }; use crate::compute::kernels; + use crate::datatypes::Field; use std::convert::TryFrom; + use std::iter::FromIterator; #[test] fn test_round_trip() -> Result<()> { @@ -712,6 +821,73 @@ mod tests { test_generic_string::() } + fn test_generic_list() -> Result<()> { + // Construct a value array + let value_data = ArrayData::builder(DataType::Int32) + .len(8) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) + .build(); + + // Construct a buffer for value offsets, for the nested array: + // [[0, 1, 2], [3, 4, 5], [6, 7]] + let value_offsets = Buffer::from_iter( + [0usize, 3, 6, 8] + .iter() + .map(|i| Offset::from_usize(*i).unwrap()), + ); + + // Construct a list array from the above two + let list_data_type = match std::mem::size_of::() { + 4 => DataType::List(Box::new(Field::new("item", DataType::Int32, false))), + _ => { + DataType::LargeList(Box::new(Field::new("item", DataType::Int32, false))) + } + }; + + let list_data = ArrayData::builder(list_data_type) + .len(3) + .add_buffer(value_offsets) + .add_child_data(value_data) + .build(); + + // create an array natively + let array = GenericListArray::::from(list_data.clone()); + + // export it + let array = ArrowArray::try_from(array.data().clone())?; + + // (simulate consumer) import it + let data = ArrayData::try_from(array)?; + let array = make_array(data); + + // downcast + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + dbg!(&array); + + // verify + let expected = GenericListArray::::from(list_data); + assert_eq!(&array.value(0), &expected.value(0)); + assert_eq!(&array.value(1), &expected.value(1)); + assert_eq!(&array.value(2), &expected.value(2)); + + // (drop/release) + Ok(()) + } + + #[test] + fn test_list() -> Result<()> { + test_generic_list::() + } + + #[test] + fn test_large_list() -> Result<()> { + test_generic_list::() + } + fn test_generic_binary() -> Result<()> { // create an array natively let array: Vec> = vec![Some(b"a"), None, Some(b"aaa")];