diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index aa1d7fe7f56..0df2bb1c1fc 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -21,13 +21,14 @@ #[cfg(feature = "simd")] use packed_simd::u8x64; +use crate::bytes::{Bytes, Deallocation, DropFn}; + use std::cmp; use std::convert::AsRef; -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write}; use std::mem; use std::ops::{BitAnd, BitOr, Not}; -use std::ptr::NonNull; use std::slice::{from_raw_parts, from_raw_parts_mut}; use std::sync::Arc; @@ -38,92 +39,16 @@ use crate::util::bit_util; #[cfg(feature = "simd")] use std::borrow::BorrowMut; -/// Buffer is a contiguous memory region of fixed size and is aligned at a 64-byte -/// boundary. Buffer is immutable. -#[derive(PartialEq, Debug)] +/// [`Buffer`] is an immutable contiguous memory region. +#[derive(PartialEq, Clone, Debug)] pub struct Buffer { /// Reference-counted pointer to the internal byte buffer. - data: Arc, + data: Arc, /// The offset into the buffer. offset: usize, } -struct BufferData { - /// The raw pointer into the buffer bytes - ptr: *const u8, - - /// The length (num of bytes) of the buffer. The region `[0, len)` of the buffer - /// is occupied with meaningful data, while the rest `[len, capacity)` is the - /// unoccupied region. - len: usize, - - /// Whether this piece of memory is owned by this object - owned: bool, - - /// The capacity (num of bytes) of the buffer - /// Invariant: len <= capacity - capacity: usize, -} - -impl PartialEq for BufferData { - fn eq(&self, other: &BufferData) -> bool { - if self.capacity != other.capacity { - return false; - } - - self.data() == other.data() - } -} - -/// Release the underlying memory when the current buffer goes out of scope -impl Drop for BufferData { - fn drop(&mut self) { - if self.is_allocated() && self.owned { - unsafe { memory::free_aligned(self.ptr as *mut u8, self.capacity) }; - } - } -} - -impl Debug for BufferData { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!( - f, - "BufferData {{ ptr: {:?}, len: {}, capacity: {}, data: ", - self.ptr, self.len, self.capacity - )?; - - f.debug_list().entries(self.data().iter()).finish()?; - - write!(f, " }}") - } -} - -impl BufferData { - fn data(&self) -> &[u8] { - if !self.is_allocated() { - &[] - } else { - unsafe { std::slice::from_raw_parts(self.ptr, self.len) } - } - } - - fn is_zst(&self) -> bool { - self.ptr == BUFFER_INIT.as_ptr() as _ - } - - fn is_allocated(&self) -> bool { - !(self.is_zst() || self.ptr.is_null()) - } -} - -/// -/// SAFETY: (vcq): -/// As you can see this is global and lives as long as the program lives. -/// This is used for lazy allocation in the further steps of buffer allocations. -/// Pointer below is well-aligned, only used for ZSTs and discarded afterwards. -const BUFFER_INIT: NonNull = NonNull::dangling(); - impl Buffer { /// Creates a buffer from an existing memory region (must already be byte-aligned), this /// `Buffer` will free this piece of memory when dropped. @@ -139,7 +64,7 @@ impl Buffer { /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. pub unsafe fn from_raw_parts(ptr: *const u8, len: usize, capacity: usize) -> Self { - Buffer::build_with_arguments(ptr, len, capacity, true) + Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity)) } /// Creates a buffer from an existing memory region (must already be byte-aligned), this @@ -149,70 +74,48 @@ impl Buffer { /// /// * `ptr` - Pointer to raw parts /// * `len` - Length of raw parts in **bytes** - /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** + /// * `drop` - Function that frees the corresponding memory region. Note that the region may be larger than `len`. /// /// # Safety /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` - /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. - pub unsafe fn from_unowned(ptr: *const u8, len: usize, capacity: usize) -> Self { - Buffer::build_with_arguments(ptr, len, capacity, false) + /// bytes and that the foreign deallocator frees the region. + pub unsafe fn from_unowned(ptr: *const u8, len: usize, drop: DropFn) -> Self { + Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(drop)) } - /// Creates a buffer from an existing memory region (must already be byte-aligned). - /// - /// # Arguments - /// - /// * `ptr` - Pointer to raw parts - /// * `len` - Length of raw parts in bytes - /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** - /// * `owned` - Whether the raw parts is owned by this `Buffer`. If true, this `Buffer` will - /// free this memory when dropped, otherwise it will skip freeing the raw parts. - /// - /// # Safety - /// - /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` - /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. + /// Auxiliary method to create a new Buffer unsafe fn build_with_arguments( ptr: *const u8, len: usize, - capacity: usize, - owned: bool, + deallocation: Deallocation, ) -> Self { - assert!( - memory::is_aligned(ptr, memory::ALIGNMENT), - "memory not aligned" - ); - let buf_data = BufferData { - ptr, - len, - capacity, - owned, - }; + let bytes = Bytes::new(ptr, len, deallocation); Buffer { - data: Arc::new(buf_data), + data: Arc::new(bytes), offset: 0, } } /// Returns the number of bytes in the buffer pub fn len(&self) -> usize { - self.data.len - self.offset + self.data.len() - self.offset } - /// Returns the capacity of this buffer + /// Returns the capacity of this buffer. + /// For exernally owned buffers, this returns zero pub fn capacity(&self) -> usize { - self.data.capacity + self.data.capacity() } /// Returns whether the buffer is empty. pub fn is_empty(&self) -> bool { - self.data.len - self.offset == 0 + self.data.len() - self.offset == 0 } /// Returns the byte slice stored in this buffer pub fn data(&self) -> &[u8] { - &self.data.data()[self.offset..] + &self.data.as_slice()[self.offset..] } /// Returns a slice of this buffer, starting from `offset`. @@ -232,7 +135,7 @@ impl Buffer { /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. pub fn raw_data(&self) -> *const u8 { - unsafe { self.data.ptr.add(self.offset) } + unsafe { self.data.raw_data().add(self.offset) } } /// View buffer as typed slice. @@ -253,20 +156,6 @@ impl Buffer { self.len() / mem::size_of::(), ) } - - /// Returns an empty buffer. - pub fn empty() -> Self { - unsafe { Self::from_raw_parts(BUFFER_INIT.as_ptr() as _, 0, 0) } - } -} - -impl Clone for Buffer { - fn clone(&self) -> Buffer { - Buffer { - data: self.data.clone(), - offset: self.offset, - } - } } /// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly @@ -667,11 +556,8 @@ impl MutableBuffer { /// Freezes this buffer and return an immutable version of it. pub fn freeze(self) -> Buffer { - let buffer_data = BufferData { - ptr: self.data, - len: self.len, - capacity: self.capacity, - owned: true, + let buffer_data = unsafe { + Bytes::new(self.data, self.len, Deallocation::Native(self.capacity)) }; std::mem::forget(self); Buffer { diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs new file mode 100644 index 00000000000..0100e118020 --- /dev/null +++ b/rust/arrow/src/bytes.rs @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains an implementation of a contiguous immutable memory region that knows +//! how to de-allocate itself, [`Bytes`]. +//! Note that this is a low-level functionality of this crate, and is only required to be used +//! when implementing FFI. + +use core::slice; +use std::{fmt::Debug, fmt::Formatter, sync::Arc}; + +use crate::memory; + +/// function resposible for de-allocating `Bytes`. +pub type DropFn = Arc; + +/// Mode of deallocating memory regions +pub enum Deallocation { + /// Native deallocation, using Rust deallocator with Arrow-specific memory aligment + Native(usize), + /// Foreign deallocation, using some other form of memory deallocation + Foreign(DropFn), +} + +impl Debug for Deallocation { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + Deallocation::Native(capacity) => { + write!(f, "Deallocation::Native {{ capacity: {} }}", capacity) + } + Deallocation::Foreign(_) => { + write!(f, "Deallocation::Foreign {{ capacity: unknown }}") + } + } + } +} + +/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself. +/// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's +/// global allocator nor u8 aligmnent. +/// +/// In the most common case, this buffer is allocated using [`allocate_aligned`](memory::allocate_aligned) +/// and deallocated accordingly [`free_aligned`](memory::free_aligned). +/// When the region is allocated by an foreign allocator, [Deallocation::Foreign], this calls the +/// foreign deallocator to deallocate the region when it is no longer needed. +pub struct Bytes { + /// The raw pointer to be begining of the region + ptr: *const u8, + + /// The number of bytes visible to this region. This is always smaller than its capacity (when avaliable). + len: usize, + + /// how to deallocate this region + deallocation: Deallocation, +} + +impl Bytes { + /// Takes ownership of an allocated memory region, + /// + /// # Arguments + /// + /// * `ptr` - Pointer to raw parts + /// * `len` - Length of raw parts in **bytes** + /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** + /// + /// # Safety + /// + /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` + /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. + pub unsafe fn new(ptr: *const u8, len: usize, deallocation: Deallocation) -> Bytes { + Bytes { + ptr, + len, + deallocation, + } + } + + #[inline] + pub fn as_slice(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.ptr, self.len) } + } + + #[inline] + pub fn len(&self) -> usize { + self.len + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + #[inline] + pub fn raw_data(&self) -> *const u8 { + self.ptr + } + + #[inline] + pub fn raw_data_mut(&mut self) -> *mut u8 { + self.ptr as *mut u8 + } + + pub fn capacity(&self) -> usize { + match self.deallocation { + Deallocation::Native(capacity) => capacity, + // we cannot determine this in general, + // and thus we state that this is externally-owned memory + Deallocation::Foreign(_) => 0, + } + } +} + +impl Drop for Bytes { + #[inline] + fn drop(&mut self) { + match &self.deallocation { + Deallocation::Native(capacity) => { + if !self.ptr.is_null() { + unsafe { memory::free_aligned(self.ptr as *mut u8, *capacity) }; + } + } + Deallocation::Foreign(drop) => { + (drop.clone())(self); + } + } + } +} + +impl PartialEq for Bytes { + fn eq(&self, other: &Bytes) -> bool { + self.as_slice() == other.as_slice() + } +} + +impl Debug for Bytes { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?; + + f.debug_list().entries(self.as_slice().iter()).finish()?; + + write!(f, " }}") + } +} + +#[cfg(test)] +mod tests { + use std::cell::Cell; + + use super::*; + + #[test] + fn test_dealloc() { + let a = Box::new(b"hello"); + + let outer = Arc::new(Cell::new(false)); + let inner = outer.clone(); + let dealloc = Arc::new(move |bytes: &mut Bytes| { + inner.set(true); + assert_eq!(bytes.as_slice(), &b"hello"[1..4]); + }); + + let b = unsafe { Bytes::new(a[1..].as_ptr(), 3, Deallocation::Foreign(dealloc)) }; + drop(b); + assert_eq!(outer.as_ref().take(), true); + // the content is still valid (as the dealloc above does not actually free it) + assert_eq!(a.as_ref(), &b"hello"); + } + + #[test] + fn test_dealloc_native() { + let capacity = 5; + let a = memory::allocate_aligned(capacity); + // create Bytes and release it. This will make `a` be an invalid pointer, but it is defined behavior + unsafe { Bytes::new(a, 3, Deallocation::Native(capacity)) }; + } +} diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs index b7090f03a7b..c40ce06ee1d 100644 --- a/rust/arrow/src/lib.rs +++ b/rust/arrow/src/lib.rs @@ -134,6 +134,7 @@ pub mod array; pub mod bitmap; pub mod buffer; +pub mod bytes; pub mod compute; pub mod csv; pub mod datatypes;