diff --git a/rust/arrow/src/array/ffi.rs b/rust/arrow/src/array/ffi.rs new file mode 100644 index 00000000000..50506caf622 --- /dev/null +++ b/rust/arrow/src/array/ffi.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains functionality to load an ArrayData from the C Data Interface + +use std::convert::TryFrom; + +use crate::{ + error::{ArrowError, Result}, + ffi, +}; + +use super::ArrayData; + +impl TryFrom for ArrayData { + type Error = ArrowError; + + fn try_from(value: ffi::ArrowArray) -> Result { + let data_type = value.data_type()?; + let len = value.len(); + let offset = value.offset(); + let null_count = value.null_count(); + let buffers = value.buffers()?; + let null_bit_buffer = value.null_bit_buffer(); + + // todo: no child data yet... + Ok(ArrayData::new( + data_type, + len, + Some(null_count), + null_bit_buffer, + offset, + buffers, + vec![], + )) + } +} + +impl TryFrom for ffi::ArrowArray { + type Error = ArrowError; + + fn try_from(value: ArrayData) -> Result { + let len = value.len; + let offset = value.offset as usize; + let null_count = value.null_count; + let buffers = value.buffers().to_vec(); + let null_buffer = value.null_buffer().cloned(); + + // todo: no child data yet... + ffi::ArrowArray::try_new( + value.data_type(), + len, + null_count, + null_buffer, + offset, + buffers, + vec![], + ) + } +} + +#[cfg(test)] +mod tests { + use crate::error::Result; + use crate::{ + array::{Array, ArrayData, Int64Array, UInt32Array, UInt64Array}, + ffi::ArrowArray, + }; + use std::convert::TryFrom; + + fn test_round_trip(expected: &ArrayData) -> Result<()> { + // create a `ArrowArray` from the data. + // here we take ownership (increase ref count) + let d1 = ArrowArray::try_from(expected.clone())?; + + // here we export the array as 2 pointers. We have no control over ownership if it was not for + // the release mechanism. + let (array, schema) = ArrowArray::to_raw(d1); + + // simulate an external consumer by being the consumer + let d1 = unsafe { ArrowArray::try_from_raw(array, schema) }?; + + let result = &ArrayData::try_from(d1)?; + + assert_eq!(result, expected); + Ok(()) + } + + #[test] + fn test_u32() -> Result<()> { + let data = UInt32Array::from(vec![2]).data(); + test_round_trip(data.as_ref()) + } + + #[test] + fn test_u64() -> Result<()> { + let data = UInt64Array::from(vec![2]).data(); + test_round_trip(data.as_ref()) + } + + #[test] + fn test_i64() -> Result<()> { + let data = Int64Array::from(vec![2]).data(); + test_round_trip(data.as_ref()) + } +} diff --git a/rust/arrow/src/array/mod.rs b/rust/arrow/src/array/mod.rs index 9debbb6b0ad..31dbf6f9ee5 100644 --- a/rust/arrow/src/array/mod.rs +++ b/rust/arrow/src/array/mod.rs @@ -254,3 +254,5 @@ pub use self::cast::{ as_boolean_array, as_dictionary_array, as_null_array, as_primitive_array, as_string_array, }; + +pub mod ffi; diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index aa1d7fe7f56..62ca5b5af3a 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -21,13 +21,17 @@ #[cfg(feature = "simd")] use packed_simd::u8x64; +use crate::{ + bytes::{Bytes, Deallocation}, + ffi, +}; + use std::cmp; use std::convert::AsRef; -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write}; use std::mem; use std::ops::{BitAnd, BitOr, Not}; -use std::ptr::NonNull; use std::slice::{from_raw_parts, from_raw_parts_mut}; use std::sync::Arc; @@ -38,92 +42,16 @@ use crate::util::bit_util; #[cfg(feature = "simd")] use std::borrow::BorrowMut; -/// Buffer is a contiguous memory region of fixed size and is aligned at a 64-byte -/// boundary. Buffer is immutable. -#[derive(PartialEq, Debug)] +/// [`Buffer`] is an immutable contiguous memory region. +#[derive(PartialEq, Clone, Debug)] pub struct Buffer { /// Reference-counted pointer to the internal byte buffer. - data: Arc, + data: Arc, /// The offset into the buffer. offset: usize, } -struct BufferData { - /// The raw pointer into the buffer bytes - ptr: *const u8, - - /// The length (num of bytes) of the buffer. The region `[0, len)` of the buffer - /// is occupied with meaningful data, while the rest `[len, capacity)` is the - /// unoccupied region. - len: usize, - - /// Whether this piece of memory is owned by this object - owned: bool, - - /// The capacity (num of bytes) of the buffer - /// Invariant: len <= capacity - capacity: usize, -} - -impl PartialEq for BufferData { - fn eq(&self, other: &BufferData) -> bool { - if self.capacity != other.capacity { - return false; - } - - self.data() == other.data() - } -} - -/// Release the underlying memory when the current buffer goes out of scope -impl Drop for BufferData { - fn drop(&mut self) { - if self.is_allocated() && self.owned { - unsafe { memory::free_aligned(self.ptr as *mut u8, self.capacity) }; - } - } -} - -impl Debug for BufferData { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!( - f, - "BufferData {{ ptr: {:?}, len: {}, capacity: {}, data: ", - self.ptr, self.len, self.capacity - )?; - - f.debug_list().entries(self.data().iter()).finish()?; - - write!(f, " }}") - } -} - -impl BufferData { - fn data(&self) -> &[u8] { - if !self.is_allocated() { - &[] - } else { - unsafe { std::slice::from_raw_parts(self.ptr, self.len) } - } - } - - fn is_zst(&self) -> bool { - self.ptr == BUFFER_INIT.as_ptr() as _ - } - - fn is_allocated(&self) -> bool { - !(self.is_zst() || self.ptr.is_null()) - } -} - -/// -/// SAFETY: (vcq): -/// As you can see this is global and lives as long as the program lives. -/// This is used for lazy allocation in the further steps of buffer allocations. -/// Pointer below is well-aligned, only used for ZSTs and discarded afterwards. -const BUFFER_INIT: NonNull = NonNull::dangling(); - impl Buffer { /// Creates a buffer from an existing memory region (must already be byte-aligned), this /// `Buffer` will free this piece of memory when dropped. @@ -139,7 +67,7 @@ impl Buffer { /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. pub unsafe fn from_raw_parts(ptr: *const u8, len: usize, capacity: usize) -> Self { - Buffer::build_with_arguments(ptr, len, capacity, true) + Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity)) } /// Creates a buffer from an existing memory region (must already be byte-aligned), this @@ -149,70 +77,52 @@ impl Buffer { /// /// * `ptr` - Pointer to raw parts /// * `len` - Length of raw parts in **bytes** - /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** + /// * `data` - An [ffi::FFI_ArrowArray] with the data /// /// # Safety /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` - /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. - pub unsafe fn from_unowned(ptr: *const u8, len: usize, capacity: usize) -> Self { - Buffer::build_with_arguments(ptr, len, capacity, false) + /// bytes and that the foreign deallocator frees the region. + pub unsafe fn from_unowned( + ptr: *const u8, + len: usize, + data: ffi::FFI_ArrowArray, + ) -> Self { + Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data)) } - /// Creates a buffer from an existing memory region (must already be byte-aligned). - /// - /// # Arguments - /// - /// * `ptr` - Pointer to raw parts - /// * `len` - Length of raw parts in bytes - /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** - /// * `owned` - Whether the raw parts is owned by this `Buffer`. If true, this `Buffer` will - /// free this memory when dropped, otherwise it will skip freeing the raw parts. - /// - /// # Safety - /// - /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` - /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. + /// Auxiliary method to create a new Buffer unsafe fn build_with_arguments( ptr: *const u8, len: usize, - capacity: usize, - owned: bool, + deallocation: Deallocation, ) -> Self { - assert!( - memory::is_aligned(ptr, memory::ALIGNMENT), - "memory not aligned" - ); - let buf_data = BufferData { - ptr, - len, - capacity, - owned, - }; + let bytes = Bytes::new(ptr, len, deallocation); Buffer { - data: Arc::new(buf_data), + data: Arc::new(bytes), offset: 0, } } /// Returns the number of bytes in the buffer pub fn len(&self) -> usize { - self.data.len - self.offset + self.data.len() - self.offset } - /// Returns the capacity of this buffer + /// Returns the capacity of this buffer. + /// For exernally owned buffers, this returns zero pub fn capacity(&self) -> usize { - self.data.capacity + self.data.capacity() } /// Returns whether the buffer is empty. pub fn is_empty(&self) -> bool { - self.data.len - self.offset == 0 + self.data.len() - self.offset == 0 } /// Returns the byte slice stored in this buffer pub fn data(&self) -> &[u8] { - &self.data.data()[self.offset..] + &self.data.as_slice()[self.offset..] } /// Returns a slice of this buffer, starting from `offset`. @@ -232,7 +142,7 @@ impl Buffer { /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. pub fn raw_data(&self) -> *const u8 { - unsafe { self.data.ptr.add(self.offset) } + unsafe { self.data.raw_data().add(self.offset) } } /// View buffer as typed slice. @@ -253,20 +163,6 @@ impl Buffer { self.len() / mem::size_of::(), ) } - - /// Returns an empty buffer. - pub fn empty() -> Self { - unsafe { Self::from_raw_parts(BUFFER_INIT.as_ptr() as _, 0, 0) } - } -} - -impl Clone for Buffer { - fn clone(&self) -> Buffer { - Buffer { - data: self.data.clone(), - offset: self.offset, - } - } } /// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly @@ -667,11 +563,8 @@ impl MutableBuffer { /// Freezes this buffer and return an immutable version of it. pub fn freeze(self) -> Buffer { - let buffer_data = BufferData { - ptr: self.data, - len: self.len, - capacity: self.capacity, - owned: true, + let buffer_data = unsafe { + Bytes::new(self.data, self.len, Deallocation::Native(self.capacity)) }; std::mem::forget(self); Buffer { diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs new file mode 100644 index 00000000000..f7bdefdf3f7 --- /dev/null +++ b/rust/arrow/src/bytes.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains an implementation of a contiguous immutable memory region that knows +//! how to de-allocate itself, [`Bytes`]. +//! Note that this is a low-level functionality of this crate. + +use core::slice; +use std::{fmt::Debug, fmt::Formatter}; + +use crate::{ffi, memory}; + +/// Mode of deallocating memory regions +pub enum Deallocation { + /// Native deallocation, using Rust deallocator with Arrow-specific memory aligment + Native(usize), + /// Foreign interface, via a callback + Foreign(ffi::FFI_ArrowArray), +} + +impl Debug for Deallocation { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + Deallocation::Native(capacity) => { + write!(f, "Deallocation::Native {{ capacity: {} }}", capacity) + } + Deallocation::Foreign(_) => { + write!(f, "Deallocation::Foreign {{ capacity: unknown }}") + } + } + } +} + +/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself. +/// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's +/// global allocator nor u8 aligmnent. +/// +/// In the most common case, this buffer is allocated using [`allocate_aligned`](memory::allocate_aligned) +/// and deallocated accordingly [`free_aligned`](memory::free_aligned). +/// When the region is allocated by an foreign allocator, [Deallocation::Foreign], this calls the +/// foreign deallocator to deallocate the region when it is no longer needed. +pub struct Bytes { + /// The raw pointer to be begining of the region + ptr: *const u8, + + /// The number of bytes visible to this region. This is always smaller than its capacity (when avaliable). + len: usize, + + /// how to deallocate this region + deallocation: Deallocation, +} + +impl Bytes { + /// Takes ownership of an allocated memory region, + /// + /// # Arguments + /// + /// * `ptr` - Pointer to raw parts + /// * `len` - Length of raw parts in **bytes** + /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** + /// + /// # Safety + /// + /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` + /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. + pub unsafe fn new(ptr: *const u8, len: usize, deallocation: Deallocation) -> Bytes { + Bytes { + ptr, + len, + deallocation, + } + } + + #[inline] + pub fn as_slice(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.ptr, self.len) } + } + + #[inline] + pub fn len(&self) -> usize { + self.len + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + #[inline] + pub fn raw_data(&self) -> *const u8 { + self.ptr + } + + #[inline] + pub fn raw_data_mut(&mut self) -> *mut u8 { + self.ptr as *mut u8 + } + + pub fn capacity(&self) -> usize { + match self.deallocation { + Deallocation::Native(capacity) => capacity, + // we cannot determine this in general, + // and thus we state that this is externally-owned memory + Deallocation::Foreign(_) => 0, + } + } +} + +impl Drop for Bytes { + #[inline] + fn drop(&mut self) { + match &self.deallocation { + Deallocation::Native(capacity) => { + if !self.ptr.is_null() { + unsafe { memory::free_aligned(self.ptr as *mut u8, *capacity) }; + } + } + // foreign interface knows how to deallocate itself. + Deallocation::Foreign(_) => (), + } + } +} + +impl PartialEq for Bytes { + fn eq(&self, other: &Bytes) -> bool { + self.as_slice() == other.as_slice() + } +} + +impl Debug for Bytes { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?; + + f.debug_list().entries(self.as_slice().iter()).finish()?; + + write!(f, " }}") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dealloc_native() { + let capacity = 5; + let a = memory::allocate_aligned(capacity); + // create Bytes and release it. This will make `a` be an invalid pointer, but it is defined behavior + unsafe { Bytes::new(a, 3, Deallocation::Native(capacity)) }; + } +} diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs index f428b4f0f99..655ae940897 100644 --- a/rust/arrow/src/error.rs +++ b/rust/arrow/src/error.rs @@ -35,6 +35,8 @@ pub enum ArrowError { IoError(String), InvalidArgumentError(String), ParquetError(String), + /// Error during import or export to/from the C Data Interface + CDataInterface(String), DictionaryKeyOverflowError, } @@ -97,6 +99,9 @@ impl Display for ArrowError { ArrowError::ParquetError(desc) => { write!(f, "Parquet argument error: {}", desc) } + ArrowError::CDataInterface(desc) => { + write!(f, "C Data interface error: {}", desc) + } ArrowError::DictionaryKeyOverflowError => { write!(f, "Dictionary key bigger than the key type") } diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs new file mode 100644 index 00000000000..b29af311840 --- /dev/null +++ b/rust/arrow/src/ffi.rs @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains declarations of the C Data Interface and how to import them. +//! Generally, this module is divided in two main interfaces: +//! One interface maps C ABI to native Rust types, i.e. convert c-pointers, c_char, to native rust. +//! This is handled by [FFI_ArrowSchema] and [FFI_ArrowArray]. +//! The other interface maps native Rust types to the Rust-specific implementation of Arrow such as `format` to [Datatype], +//! `Buffer`, etc. This is handled by [ArrowArray]. + +use std::{ffi::CStr, ffi::CString, iter, mem, ptr}; + +use crate::datatypes::DataType; +use crate::error::{ArrowError, Result}; +use crate::{buffer::Buffer, memory}; + +/// ABI-compatible struct for FFI_ArrowSchema from C Data Interface +/// See https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions +/// This was created by bindgen +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_ArrowSchema { + format: *const ::std::os::raw::c_char, + name: *const ::std::os::raw::c_char, + metadata: *const ::std::os::raw::c_char, + flags: i64, + n_children: i64, + children: *mut *mut FFI_ArrowSchema, + dictionary: *mut FFI_ArrowSchema, + release: ::std::option::Option, + private_data: *mut ::std::os::raw::c_void, +} + +impl FFI_ArrowSchema { + /// create a new [FFI_ArrowSchema] from a format. Atm we are only using `format`. + pub(crate) fn new(format: &str) -> FFI_ArrowSchema { + // https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema + FFI_ArrowSchema { + format: CString::new(format).unwrap().into_raw(), + name: std::ptr::null_mut(), + metadata: std::ptr::null_mut(), + flags: 0, + n_children: 0, + children: ptr::null_mut(), + dictionary: std::ptr::null_mut(), + release: None, + private_data: std::ptr::null_mut(), + } + } + + pub(crate) fn format(&self) -> &str { + unsafe { CStr::from_ptr(self.format) } + .to_str() + .expect("The external API has a non-utf8 as format") + } +} + +impl Drop for FFI_ArrowSchema { + fn drop(&mut self) { + match self.release { + None => (), + Some(release) => unsafe { release(self) }, + }; + } +} + +/// maps a DataType `format` to a [DataType](arrow::datatypes::DataType). +/// See https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings +fn to_datatype(format: &str) -> Result { + Ok(match format { + "n" => DataType::Null, + "b" => DataType::Boolean, + "c" => DataType::Int8, + "C" => DataType::UInt8, + "s" => DataType::Int16, + "S" => DataType::UInt16, + "i" => DataType::Int32, + "I" => DataType::UInt32, + "l" => DataType::Int64, + "L" => DataType::UInt64, + "e" => DataType::Float16, + "f" => DataType::Float32, + "g" => DataType::Float64, + "z" => DataType::Binary, + "Z" => DataType::LargeBinary, + "u" => DataType::Utf8, + "U" => DataType::LargeUtf8, + _ => { + return Err(ArrowError::CDataInterface( + "The datatype \"{}\" is still not supported in Rust implementation" + .to_string(), + )) + } + }) +} + +/// the inverse of [to_datatype] +fn from_datatype(datatype: &DataType) -> Result { + Ok(match datatype { + DataType::Null => "n", + DataType::Boolean => "b", + DataType::Int8 => "c", + DataType::UInt8 => "C", + DataType::Int16 => "s", + DataType::UInt16 => "S", + DataType::Int32 => "i", + DataType::UInt32 => "I", + DataType::Int64 => "l", + DataType::UInt64 => "L", + DataType::Float16 => "e", + DataType::Float32 => "f", + DataType::Float64 => "g", + DataType::Binary => "z", + DataType::LargeBinary => "Z", + DataType::Utf8 => "u", + DataType::LargeUtf8 => "U", + _ => { + return Err(ArrowError::CDataInterface( + "The datatype \"{:?}\" is still not supported in Rust implementation" + .to_string(), + )) + } + } + .to_string()) +} + +// returns the number of bytes of a given type, when stored in a buffer. +// applicable to types that can be represented as buffers +fn byte_width(data_type: &DataType) -> Result { + Ok(match data_type { + DataType::UInt32 => 4, + DataType::UInt64 => 8, + DataType::Int64 => 8, + _ => { + return Err(ArrowError::CDataInterface(format!( + "The datatype \"{:?}\" is still not supported in Rust implementation", + data_type + ))) + } + }) +} + +/// ABI-compatible struct for ArrowArray from C Data Interface +/// See https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions +/// This was created by bindgen +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_ArrowArray { + pub length: i64, + pub null_count: i64, + pub offset: i64, + pub n_buffers: i64, + pub n_children: i64, + pub buffers: *mut *const ::std::os::raw::c_void, + pub children: *mut *mut FFI_ArrowArray, + pub dictionary: *mut FFI_ArrowArray, + pub release: ::std::option::Option, + pub private_data: *mut ::std::os::raw::c_void, +} + +// callback used to drop [FFI_ArrowArray]. +extern "C" fn release(array: *mut FFI_ArrowArray) { + if array.is_null() { + return (); + } + let value = unsafe { &*array }; + + (0..value.n_buffers).for_each(|i| { + let ptr = unsafe { *value.buffers.add(i as usize) } as *mut u8; + if !ptr.is_null() { + // todo: store in the private data the buffer's capacity, as it needs to be passed here. + unsafe { memory::free_aligned(ptr, 4) }; + } + }); +} + +impl FFI_ArrowArray { + /// creates a new `FFI_ArrowArray` from existing data. + /// This is used to export an `FFI_ArrowArray`. + fn new( + length: i64, + null_count: i64, + offset: i64, + n_buffers: i64, + buffers: Vec>, + ) -> Self { + // build an array of pointers to the buffers, forgetting the buffers + let mut buffers_ptr = buffers + .iter() + .map(|maybe_buffer| { + match maybe_buffer { + Some(b) => { + let ptr = b.raw_data() as *const std::os::raw::c_void; + // forget the buffer + mem::forget(b); + ptr + } + None => std::ptr::null(), + } + }) + .collect::>(); + mem::forget(buffers); + let buffers = buffers_ptr.as_mut_ptr(); + // forget the array of buffers as `release` will take care of them. + mem::forget(buffers_ptr); + + Self { + length, + null_count, + offset, + n_buffers, + n_children: 0, + buffers, + children: std::ptr::null_mut(), + dictionary: std::ptr::null_mut(), + release: Some(release), + private_data: std::ptr::null_mut(), + } + } + + /// returns the buffer at index `i`. It may not exist (null pointer) + /// `offset` must match the offset of each of the valid types, in bytes. + /// # Panic + /// This function panics if i is larger or equal to `n_buffers`. + fn buffer(&self, i: usize, offset: usize) -> Option { + assert!(i < self.n_buffers as usize); + if self.buffers.is_null() { + return None; + } + let buffers = self.buffers as *mut *const u8; + + let ptr = unsafe { *buffers.add(i) }; + + if ptr.is_null() { + return None; + } else { + let len = self.length as usize * offset; + Some(unsafe { Buffer::from_unowned(ptr, len, self.clone()) }) + } + } +} + +impl Drop for FFI_ArrowArray { + fn drop(&mut self) { + match self.release { + None => (), + Some(release) => unsafe { release(self) }, + }; + } +} + +/// Struct containing the necessary data and schema to move an Array from and to the C Data Interface. +/// Its main responsibility is to expose functionality that requires +/// both [FFI_ArrowArray] and [FFI_ArrowSchema]. +/// +/// To export to the C Data Interface, use [ArrowArray::new] and [ArrowArray::to_raw]. +/// To import from the C Data Interface, use [ArrowArray::try_from_raw]. +#[derive(Debug)] +pub struct ArrowArray { + // invariant: neither of them is null. We just do not own them and their lifetime is not monitored + // by Rust + array: *const FFI_ArrowArray, + schema: *const FFI_ArrowSchema, +} + +impl ArrowArray { + /// creates a new `ArrowArray`. This is used to export to the C Data Interface. + pub fn try_new( + data_type: &DataType, + len: usize, + null_count: usize, + null_buffer: Option, + offset: usize, + buffers: Vec, + child_data: Vec, + ) -> Result { + let format = from_datatype(data_type)?; + // * insert the null buffer at the start + // * make all others `Option`. + let new_buffers = iter::once(null_buffer) + .chain(buffers.iter().map(|b| Some(b.clone()))) + .collect::>(); + + // allocate both structures in the heap + let schema = Box::into_raw(Box::new(FFI_ArrowSchema::new(&format))); + let array = Box::into_raw(Box::new(FFI_ArrowArray::new( + len as i64, + null_count as i64, + offset as i64, + new_buffers.len() as i64, + new_buffers, + ))); + + Ok(ArrowArray { schema, array }) + } + + /// creates a new `ArrowArray`. This is used to import from the C Data Interface + pub unsafe fn try_from_raw( + array: *const FFI_ArrowArray, + schema: *const FFI_ArrowSchema, + ) -> Result { + if array.is_null() || schema.is_null() { + return Err(ArrowError::MemoryError( + "At least one of the pointers passed to `try_from_raw` is null" + .to_string(), + )); + }; + Ok(Self { array, schema }) + } + + // private method to access `array`. This is safe because [ArrowArray::try_from_raw] asserts that the pointer is not null. + #[inline] + fn get_array(&self) -> &FFI_ArrowArray { + unsafe { self.array.as_ref() }.unwrap() + } + + /// exports [ArrowArray]] to the C Data Interface + pub fn to_raw(this: ArrowArray) -> (*const FFI_ArrowArray, *const FFI_ArrowSchema) { + (this.array, this.schema) + } + + /// returns the null bit buffer. + /// Rust implementation uses a buffer that is not part of the array of buffers. + /// The C Data interface's null buffer is part of the array of buffers. + pub fn null_bit_buffer(&self) -> Option { + // the null bitmaps are always 4 bytes? + self.get_array().buffer(0, 4) + } + + /// returns all buffers, as organized by Rust. + pub fn buffers(&self) -> Result> { + // todo: width depends on the buffer... + let width = byte_width(&self.data_type()?)?; + let array = self.get_array(); + Ok((1..array.n_buffers) + .map(|i| array.buffer(i as usize, width).unwrap()) + .collect()) + } + + /// the length of the array + pub fn len(&self) -> usize { + self.get_array().length as usize + } + + /// the offset of the array + pub fn offset(&self) -> usize { + self.get_array().offset as usize + } + + /// the null count of the array + pub fn null_count(&self) -> usize { + self.get_array().null_count as usize + } + + /// the data_type as declared in the schema + pub fn data_type(&self) -> Result { + to_datatype(unsafe { self.schema.as_ref() }.unwrap().format()) + } +} + +// todo: add testing diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs index b7090f03a7b..a0f592048c4 100644 --- a/rust/arrow/src/lib.rs +++ b/rust/arrow/src/lib.rs @@ -134,10 +134,12 @@ pub mod array; pub mod bitmap; pub mod buffer; +pub mod bytes; pub mod compute; pub mod csv; pub mod datatypes; pub mod error; +pub mod ffi; pub mod ipc; pub mod json; pub mod memory;