diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index df564f9583b..ba85c611f8b 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -27,9 +27,7 @@ use std::fmt::{Debug, Formatter}; use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write}; use std::mem; use std::ops::{BitAnd, BitOr, Not}; -use std::slice::from_raw_parts; -#[cfg(feature = "simd")] -use std::slice::from_raw_parts_mut; +use std::slice::{from_raw_parts, from_raw_parts_mut}; use std::sync::Arc; use crate::array::{BufferBuilderTrait, UInt8BufferBuilder}; @@ -68,14 +66,11 @@ struct BufferData { impl PartialEq for BufferData { fn eq(&self, other: &BufferData) -> bool { - if self.len != other.len { - return false; - } if self.capacity != other.capacity { return false; } - unsafe { memory::memcmp(self.ptr, other.ptr, self.len) == 0 } + self.data() == other.data() } } @@ -96,16 +91,22 @@ impl Debug for BufferData { self.ptr, self.len, self.capacity )?; - unsafe { - f.debug_list() - .entries(std::slice::from_raw_parts(self.ptr, self.len).iter()) - .finish()?; - } + f.debug_list().entries(self.data().iter()).finish()?; write!(f, " }}") } } +impl BufferData { + fn data(&self) -> &[u8] { + if self.ptr.is_null() { + &[] + } else { + unsafe { std::slice::from_raw_parts(self.ptr, self.len) } + } + } +} + impl Buffer { /// Creates a buffer from an existing memory region (must already be byte-aligned), this /// `Buffer` will free this piece of memory when dropped. @@ -194,13 +195,13 @@ impl Buffer { /// Returns the byte slice stored in this buffer pub fn data(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) } + &self.data.data()[self.offset..] } /// Returns a slice of this buffer, starting from `offset`. pub fn slice(&self, offset: usize) -> Self { assert!( - self.offset + offset <= self.len(), + offset <= self.len(), "the offset of the new Buffer cannot exceed the existing length" ); Self { @@ -511,12 +512,20 @@ impl MutableBuffer { /// Returns the data stored in this buffer as a slice. pub fn data(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) } + if self.data.is_null() { + &[] + } else { + unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) } + } } /// Returns the data stored in this buffer as a mutable slice. pub fn data_mut(&mut self) -> &mut [u8] { - unsafe { std::slice::from_raw_parts_mut(self.raw_data() as *mut u8, self.len()) } + if self.data.is_null() { + &mut [] + } else { + unsafe { std::slice::from_raw_parts_mut(self.raw_data_mut(), self.len()) } + } } /// Returns a raw pointer for this buffer. @@ -527,6 +536,10 @@ impl MutableBuffer { self.data } + pub fn raw_data_mut(&mut self) -> *mut u8 { + self.data + } + /// Freezes this buffer and return an immutable version of it. pub fn freeze(self) -> Buffer { let buffer_data = BufferData { @@ -541,6 +554,18 @@ impl MutableBuffer { offset: 0, } } + + /// View buffer as typed slice. + pub fn typed_data_mut(&mut self) -> &mut [T] { + assert_eq!(self.len() % mem::size_of::(), 0); + assert!(memory::is_ptr_aligned::(self.raw_data() as *const T)); + unsafe { + from_raw_parts_mut( + self.raw_data() as *mut T, + self.len() / mem::size_of::(), + ) + } + } } impl Drop for MutableBuffer { @@ -665,6 +690,7 @@ mod tests { assert_eq!(empty_slice, buf4.data()); assert_eq!(0, buf4.len()); assert!(buf4.is_empty()); + assert_eq!(buf2.slice(2).data(), &[10]); } #[test] diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 342e3642c92..137b9ab6a3a 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -19,10 +19,8 @@ use std::cmp::{max, min}; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::mem::size_of; -use std::mem::transmute; use std::rc::Rc; use std::result::Result::Ok; -use std::slice::from_raw_parts_mut; use std::sync::Arc; use std::vec::Vec; @@ -151,120 +149,63 @@ impl ArrayReader for PrimitiveArrayReader { // convert to arrays let array = match (&self.data_type, T::get_physical_type()) { - (ArrowType::Boolean, PhysicalType::BOOLEAN) => unsafe { - BoolConverter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Int8, PhysicalType::INT32) => unsafe { - Int8Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Int16, PhysicalType::INT32) => unsafe { - Int16Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Int32, PhysicalType::INT32) => unsafe { - Int32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::UInt8, PhysicalType::INT32) => unsafe { - UInt8Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::UInt16, PhysicalType::INT32) => unsafe { - UInt16Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::UInt32, PhysicalType::INT32) => unsafe { - UInt32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Int64, PhysicalType::INT64) => unsafe { - Int64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::UInt64, PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Float32, PhysicalType::FLOAT) => unsafe { - Float32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Float64, PhysicalType::DOUBLE) => unsafe { - Float64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Timestamp(_, _), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Date32(_), PhysicalType::INT32) => unsafe { - UInt32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Date64(_), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Time32(_), PhysicalType::INT32) => unsafe { - UInt32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Time64(_), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => unsafe { - UInt32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Duration(_), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, + (ArrowType::Boolean, PhysicalType::BOOLEAN) => { + BoolConverter::convert(self.record_reader.cast::()) + } + (ArrowType::Int8, PhysicalType::INT32) => { + Int8Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Int16, PhysicalType::INT32) => { + Int16Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Int32, PhysicalType::INT32) => { + Int32Converter::convert(self.record_reader.cast::()) + } + (ArrowType::UInt8, PhysicalType::INT32) => { + UInt8Converter::convert(self.record_reader.cast::()) + } + (ArrowType::UInt16, PhysicalType::INT32) => { + UInt16Converter::convert(self.record_reader.cast::()) + } + (ArrowType::UInt32, PhysicalType::INT32) => { + UInt32Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Int64, PhysicalType::INT64) => { + Int64Converter::convert(self.record_reader.cast::()) + } + (ArrowType::UInt64, PhysicalType::INT64) => { + UInt64Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Float32, PhysicalType::FLOAT) => { + Float32Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Float64, PhysicalType::DOUBLE) => { + Float64Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Timestamp(_, _), PhysicalType::INT64) => { + UInt64Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Date32(_), PhysicalType::INT32) => { + UInt32Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Date64(_), PhysicalType::INT64) => { + UInt64Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Time32(_), PhysicalType::INT32) => { + UInt32Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Time64(_), PhysicalType::INT64) => { + UInt64Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => { + UInt32Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => { + UInt64Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Duration(_), PhysicalType::INT64) => { + UInt64Converter::convert(self.record_reader.cast::()) + } (arrow_type, physical_type) => Err(general_err!( "Reading {:?} type from parquet {:?} is not supported yet.", arrow_type, @@ -562,10 +503,7 @@ impl ArrayReader for StructArrayReader { let mut def_level_data_buffer = MutableBuffer::new(buffer_size); def_level_data_buffer.resize(buffer_size)?; - let def_level_data = unsafe { - let ptr = transmute::<*const u8, *mut i16>(def_level_data_buffer.raw_data()); - from_raw_parts_mut(ptr, children_array_len) - }; + let def_level_data = def_level_data_buffer.typed_data_mut(); def_level_data .iter_mut() diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index de42ae7f953..2acc01ca33e 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -16,8 +16,8 @@ // under the License. use std::cmp::{max, min}; +use std::mem::align_of; use std::mem::size_of; -use std::mem::transmute; use std::mem::{replace, swap}; use std::slice; @@ -28,6 +28,7 @@ use crate::schema::types::ColumnDescPtr; use arrow::array::{BooleanBufferBuilder, BufferBuilderTrait}; use arrow::bitmap::Bitmap; use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::memory; const MIN_BATCH_SIZE: usize = 1024; @@ -53,39 +54,40 @@ pub struct RecordReader { } #[derive(Debug)] -struct FatPtr { - ptr: *const T, - len: usize, +struct FatPtr<'a, T> { + ptr: &'a mut [T], } -impl FatPtr { - fn new(ptr: *const T, len: usize) -> Self { - Self { ptr, len } +impl<'a, T> FatPtr<'a, T> { + fn new(ptr: &'a mut [T]) -> Self { + Self { ptr } } - fn with_offset(buf: &MutableBuffer, offset: usize) -> Self { + fn with_offset(buf: &'a mut MutableBuffer, offset: usize) -> Self { FatPtr::::with_offset_and_size(buf, offset, size_of::()) } fn with_offset_and_size( - buf: &MutableBuffer, + buf: &'a mut MutableBuffer, offset: usize, type_size: usize, ) -> Self { + assert!(align_of::() <= memory::ALIGNMENT); + // TODO Prevent this from being called with non primitive types (like `Box`) unsafe { - FatPtr::new( - transmute::<*const u8, *mut T>(buf.raw_data()).add(offset), + FatPtr::new(slice::from_raw_parts_mut( + &mut *(buf.raw_data() as *mut T).add(offset), buf.capacity() / type_size - offset, - ) + )) } } fn to_slice(&self) -> &[T] { - unsafe { slice::from_raw_parts(self.ptr, self.len) } + self.ptr } - fn to_slice_mut(&self) -> &mut [T] { - unsafe { slice::from_raw_parts_mut(self.ptr as *mut T, self.len) } + fn to_slice_mut(&mut self) -> &mut [T] { + self.ptr } } @@ -121,6 +123,26 @@ impl RecordReader { } } + pub(crate) fn cast(&mut self) -> &mut RecordReader { + trait CastRecordReader { + fn cast(&mut self) -> &mut RecordReader; + } + + impl CastRecordReader for RecordReader { + default fn cast(&mut self) -> &mut RecordReader { + panic!("Attempted to cast RecordReader to the wrong type") + } + } + + impl CastRecordReader for RecordReader { + fn cast(&mut self) -> &mut RecordReader { + self + } + } + + CastRecordReader::::cast(self) + } + /// Set the current page reader. pub fn set_page_reader(&mut self, page_reader: Box) -> Result<()> { self.column_reader = @@ -198,10 +220,10 @@ impl RecordReader { ); new_buffer.resize(num_left_values * size_of::())?; - let new_def_levels = FatPtr::::with_offset(&new_buffer, 0); + let mut new_def_levels = FatPtr::::with_offset(&mut new_buffer, 0); let new_def_levels = new_def_levels.to_slice_mut(); let left_def_levels = - FatPtr::::with_offset(&def_levels_buf, self.num_values); + FatPtr::::with_offset(def_levels_buf, self.num_values); let left_def_levels = left_def_levels.to_slice(); new_def_levels[0..num_left_values] @@ -227,10 +249,10 @@ impl RecordReader { ); new_buffer.resize(num_left_values * size_of::())?; - let new_rep_levels = FatPtr::::with_offset(&new_buffer, 0); + let mut new_rep_levels = FatPtr::::with_offset(&mut new_buffer, 0); let new_rep_levels = new_rep_levels.to_slice_mut(); let left_rep_levels = - FatPtr::::with_offset(&rep_levels_buf, self.num_values); + FatPtr::::with_offset(rep_levels_buf, self.num_values); let left_rep_levels = left_rep_levels.to_slice(); new_rep_levels[0..num_left_values] @@ -254,11 +276,11 @@ impl RecordReader { let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values)); new_buffer.resize(num_left_values * T::get_type_size())?; - let new_records = - FatPtr::::with_offset_and_size(&new_buffer, 0, T::get_type_size()); + let mut new_records = + FatPtr::::with_offset_and_size(&mut new_buffer, 0, T::get_type_size()); let new_records = new_records.to_slice_mut(); - let left_records = FatPtr::::with_offset_and_size( - &self.records, + let mut left_records = FatPtr::::with_offset_and_size( + &mut self.records, self.num_values, T::get_type_size(), ); @@ -336,21 +358,22 @@ impl RecordReader { } // Convert mutable buffer spaces to mutable slices - let values_buf = FatPtr::::with_offset_and_size( - &self.records, + let mut values_buf = FatPtr::::with_offset_and_size( + &mut self.records, self.values_written, T::get_type_size(), ); + let values_written = self.values_written; let mut def_levels_buf = self .def_levels - .as_ref() - .map(|buf| FatPtr::::with_offset(buf, self.values_written)); + .as_mut() + .map(|buf| FatPtr::::with_offset(buf, values_written)); let mut rep_levels_buf = self .rep_levels - .as_ref() - .map(|buf| FatPtr::::with_offset(buf, self.values_written)); + .as_mut() + .map(|buf| FatPtr::::with_offset(buf, values_written)); let (values_read, levels_read) = self.column_reader.as_mut().unwrap().read_batch( @@ -421,7 +444,7 @@ impl RecordReader { fn split_records(&mut self, records_to_read: usize) -> Result { let rep_levels_buf = self .rep_levels - .as_ref() + .as_mut() .map(|buf| FatPtr::::with_offset(buf, 0)); let rep_levels_buf = rep_levels_buf.as_ref().map(|x| x.to_slice()); diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs index d7e20fa5533..c0870d0065c 100644 --- a/rust/parquet/src/data_type.rs +++ b/rust/parquet/src/data_type.rs @@ -26,7 +26,10 @@ use crate::basic::Type; use crate::column::reader::{ColumnReader, ColumnReaderImpl}; use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; use crate::errors::{ParquetError, Result}; -use crate::util::memory::{ByteBuffer, ByteBufferPtr}; +use crate::util::{ + bit_util::{from_ne_slice, FromBytes}, + memory::{ByteBuffer, ByteBufferPtr}, +}; use std::str::from_utf8; /// Rust representation for logical type INT96, value is backed by an array of `u32`. @@ -303,6 +306,19 @@ pub trait AsBytes { fn as_bytes(&self) -> &[u8]; } +/// Converts an slice of a data type to a slice of bytes. +pub trait SliceAsBytes: Sized { + /// Returns slice of bytes for a slice of this data type. + fn slice_as_bytes(self_: &[Self]) -> &[u8]; + fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8]; +} + +impl AsBytes for [u8] { + fn as_bytes(&self) -> &[u8] { + self + } +} + macro_rules! gen_as_bytes { ($source_ty:ident) => { impl AsBytes for $source_ty { @@ -315,17 +331,44 @@ macro_rules! gen_as_bytes { } } } + impl SliceAsBytes for $source_ty { + fn slice_as_bytes(self_: &[Self]) -> &[u8] { + unsafe { + std::slice::from_raw_parts( + self_.as_ptr() as *const u8, + std::mem::size_of::<$source_ty>() * self_.len(), + ) + } + } + fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] { + unsafe { + std::slice::from_raw_parts_mut( + self_.as_mut_ptr() as *mut u8, + std::mem::size_of::<$source_ty>() * self_.len(), + ) + } + } + } }; } -gen_as_bytes!(bool); -gen_as_bytes!(u8); +gen_as_bytes!(i8); +gen_as_bytes!(i16); gen_as_bytes!(i32); -gen_as_bytes!(u32); gen_as_bytes!(i64); +gen_as_bytes!(u8); +gen_as_bytes!(u16); +gen_as_bytes!(u32); +gen_as_bytes!(u64); gen_as_bytes!(f32); gen_as_bytes!(f64); +impl AsBytes for bool { + fn as_bytes(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) } + } +} + impl AsBytes for Int96 { fn as_bytes(&self) -> &[u8] { unsafe { @@ -371,7 +414,8 @@ pub trait DataType: 'static { + std::fmt::Debug + std::default::Default + std::clone::Clone - + AsBytes; + + AsBytes + + FromBytes; /// Returns Parquet physical type. fn get_physical_type() -> Type; @@ -400,6 +444,20 @@ pub trait DataType: 'static { Self: Sized; } +// Workaround bug in specialization +pub trait SliceAsBytesDataType: DataType +where + Self::T: SliceAsBytes, +{ +} + +impl SliceAsBytesDataType for T +where + T: DataType, + ::T: SliceAsBytes, +{ +} + macro_rules! make_type { ($name:ident, $physical_ty:path, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => { pub struct $name {} @@ -521,6 +579,40 @@ make_type!( mem::size_of::() ); +impl FromBytes for Int96 { + type Buffer = [u8; 12]; + fn from_le_bytes(_bs: Self::Buffer) -> Self { + unimplemented!() + } + fn from_be_bytes(_bs: Self::Buffer) -> Self { + unimplemented!() + } + fn from_ne_bytes(bs: Self::Buffer) -> Self { + let mut i = Int96::new(); + i.set_data( + from_ne_slice(&bs[0..4]), + from_ne_slice(&bs[4..8]), + from_ne_slice(&bs[8..12]), + ); + i + } +} + +// FIXME Needed to satisfy the constraint of many decoding functions but ByteArray does not +// appear to actual be converted directly from bytes +impl FromBytes for ByteArray { + type Buffer = [u8; 8]; + fn from_le_bytes(_bs: Self::Buffer) -> Self { + unreachable!() + } + fn from_be_bytes(_bs: Self::Buffer) -> Self { + unreachable!() + } + fn from_ne_bytes(_bs: Self::Buffer) -> Self { + unreachable!() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/parquet/src/encodings/decoding.rs b/rust/parquet/src/encodings/decoding.rs index c3d5d7c83f8..647d423f72e 100644 --- a/rust/parquet/src/encodings/decoding.rs +++ b/rust/parquet/src/encodings/decoding.rs @@ -17,7 +17,7 @@ //! Contains all supported decoders for Parquet. -use std::{cmp, marker::PhantomData, mem, slice::from_raw_parts_mut}; +use std::{cmp, marker::PhantomData, mem}; use super::rle::RleDecoder; @@ -28,7 +28,7 @@ use crate::data_type::*; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::{ - bit_util::{self, BitReader}, + bit_util::{self, BitReader, FromBytes}, memory::{ByteBuffer, ByteBufferPtr}, }; @@ -188,7 +188,17 @@ impl Decoder for PlainDecoder { } #[inline] - default fn get(&mut self, buffer: &mut [T::T]) -> Result { + default fn get(&mut self, _buffer: &mut [T::T]) -> Result { + unreachable!() + } +} + +impl Decoder for PlainDecoder +where + T::T: SliceAsBytes, +{ + #[inline] + fn get(&mut self, buffer: &mut [T::T]) -> Result { assert!(self.data.is_some()); let data = self.data.as_mut().unwrap(); @@ -198,8 +208,7 @@ impl Decoder for PlainDecoder { if bytes_left < bytes_to_decode { return Err(eof_err!("Not enough bytes to decode")); } - let raw_buffer: &mut [u8] = - unsafe { from_raw_parts_mut(buffer.as_ptr() as *mut u8, bytes_to_decode) }; + let raw_buffer = &mut T::T::slice_as_bytes_mut(buffer)[..bytes_to_decode]; raw_buffer.copy_from_slice(data.range(self.start, bytes_to_decode).as_ref()); self.start += bytes_to_decode; self.num_values -= num_values; @@ -245,7 +254,7 @@ impl Decoder for PlainDecoder { Ok(()) } - fn get(&mut self, buffer: &mut [bool]) -> Result { + default fn get(&mut self, buffer: &mut [bool]) -> Result { assert!(self.bit_reader.is_some()); let bit_reader = self.bit_reader.as_mut().unwrap(); @@ -541,7 +550,10 @@ impl DeltaBitPackDecoder { /// Loads delta into mini block. #[inline] - fn load_deltas_in_mini_block(&mut self) -> Result<()> { + fn load_deltas_in_mini_block(&mut self) -> Result<()> + where + T::T: FromBytes, + { self.deltas_in_mini_block.clear(); if self.use_batch { self.deltas_in_mini_block @@ -566,7 +578,10 @@ impl DeltaBitPackDecoder { } } -impl Decoder for DeltaBitPackDecoder { +impl Decoder for DeltaBitPackDecoder +where + T::T: FromBytes, +{ // # of total values is derived from encoding #[inline] default fn set_data(&mut self, data: ByteBufferPtr, _: usize) -> Result<()> { @@ -928,7 +943,7 @@ impl Decoder for DeltaByteArrayDecoder [u8; 4] { - unsafe { mem::transmute::(v as u32) } + (v as u32).to_ne_bytes() } /// A util trait to convert slices of different types to byte arrays @@ -1448,18 +1463,11 @@ mod tests { impl ToByteArray for T where - T: DataType, + T: SliceAsBytesDataType, + ::T: SliceAsBytes, { default fn to_byte_array(data: &[T::T]) -> Vec { - let mut v = vec![]; - let type_len = std::mem::size_of::(); - v.extend_from_slice(unsafe { - std::slice::from_raw_parts( - data.as_ptr() as *const u8, - data.len() * type_len, - ) - }); - v + ::T::slice_as_bytes(data).to_vec() } } @@ -1482,11 +1490,7 @@ mod tests { fn to_byte_array(data: &[Int96]) -> Vec { let mut v = vec![]; for d in data { - unsafe { - let copy = - std::slice::from_raw_parts(d.data().as_ptr() as *const u8, 12); - v.extend_from_slice(copy); - }; + v.extend_from_slice(d.as_bytes()); } v } diff --git a/rust/parquet/src/encodings/encoding.rs b/rust/parquet/src/encodings/encoding.rs index c9cad12e42d..98c61a05329 100644 --- a/rust/parquet/src/encodings/encoding.rs +++ b/rust/parquet/src/encodings/encoding.rs @@ -17,7 +17,7 @@ //! Contains all supported encoders for Parquet. -use std::{cmp, io::Write, marker::PhantomData, mem, slice}; +use std::{cmp, io::Write, marker::PhantomData, mem}; use crate::basic::*; use crate::data_type::*; @@ -129,17 +129,6 @@ impl PlainEncoder { } impl Encoder for PlainEncoder { - default fn put(&mut self, values: &[T::T]) -> Result<()> { - let bytes = unsafe { - slice::from_raw_parts( - values as *const [T::T] as *const u8, - mem::size_of::() * values.len(), - ) - }; - self.buffer.write(bytes)?; - Ok(()) - } - fn encoding(&self) -> Encoding { Encoding::PLAIN } @@ -156,6 +145,21 @@ impl Encoder for PlainEncoder { Ok(self.buffer.consume()) } + + default fn put(&mut self, _values: &[T::T]) -> Result<()> { + unreachable!() + } +} + +impl Encoder for PlainEncoder +where + T::T: SliceAsBytes, +{ + default fn put(&mut self, values: &[T::T]) -> Result<()> { + let bytes = T::T::slice_as_bytes(values); + self.buffer.write(bytes)?; + Ok(()) + } } impl Encoder for PlainEncoder { @@ -634,11 +638,7 @@ impl DeltaBitPackEncoder { self.bit_writer.put_zigzag_vlq_int(min_delta); // Slice to store bit width for each mini block - // apply unsafe allocation to avoid double mutable borrow - let mini_block_widths: &mut [u8] = unsafe { - let tmp_slice = self.bit_writer.get_next_byte_ptr(self.num_mini_blocks)?; - slice::from_raw_parts_mut(tmp_slice.as_ptr() as *mut u8, self.num_mini_blocks) - }; + let offset = self.bit_writer.skip(self.num_mini_blocks)?; for i in 0..self.num_mini_blocks { // Find how many values we need to encode - either block size or whatever @@ -657,7 +657,7 @@ impl DeltaBitPackEncoder { // Compute bit width to store (max_delta - min_delta) let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)); - mini_block_widths[i] = bit_width as u8; + self.bit_writer.write_at(offset + i, bit_width as u8); // Encode values in current mini block using min_delta and bit_width for j in 0..n { diff --git a/rust/parquet/src/encodings/rle.rs b/rust/parquet/src/encodings/rle.rs index 8463df7876a..26df49f70fb 100644 --- a/rust/parquet/src/encodings/rle.rs +++ b/rust/parquet/src/encodings/rle.rs @@ -15,14 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::{ - cmp, - mem::{size_of, transmute_copy}, -}; +use std::{cmp, mem::size_of}; use crate::errors::{ParquetError, Result}; use crate::util::{ - bit_util::{self, BitReader, BitWriter}, + bit_util::{self, from_ne_slice, BitReader, BitWriter, FromBytes}, memory::ByteBufferPtr, }; @@ -369,7 +366,7 @@ impl RleDecoder { } #[inline] - pub fn get(&mut self) -> Result> { + pub fn get(&mut self) -> Result> { assert!(size_of::() <= 8); while self.rle_left <= 0 && self.bit_packed_left <= 0 { @@ -379,13 +376,13 @@ impl RleDecoder { } let value = if self.rle_left > 0 { - let rle_value = unsafe { - transmute_copy::( - self.current_value - .as_mut() - .expect("current_value should be Some"), - ) - }; + let rle_value = from_ne_slice( + &self + .current_value + .as_mut() + .expect("current_value should be Some") + .to_ne_bytes(), + ); self.rle_left -= 1; rle_value } else { @@ -402,7 +399,7 @@ impl RleDecoder { } #[inline] - pub fn get_batch(&mut self, buffer: &mut [T]) -> Result { + pub fn get_batch(&mut self, buffer: &mut [T]) -> Result { assert!(self.bit_reader.is_some()); assert!(size_of::() <= 8); @@ -413,9 +410,9 @@ impl RleDecoder { let num_values = cmp::min(buffer.len() - values_read, self.rle_left as usize); for i in 0..num_values { - let repeated_value = unsafe { - transmute_copy::(self.current_value.as_mut().unwrap()) - }; + let repeated_value = from_ne_slice( + &self.current_value.as_mut().unwrap().to_ne_bytes(), + ); buffer[values_read + i] = repeated_value; } self.rle_left -= num_values as u32; diff --git a/rust/parquet/src/file/statistics.rs b/rust/parquet/src/file/statistics.rs index 421361cddb5..edd3c63f24f 100644 --- a/rust/parquet/src/file/statistics.rs +++ b/rust/parquet/src/file/statistics.rs @@ -44,6 +44,7 @@ use parquet_format::Statistics as TStatistics; use crate::basic::Type; use crate::data_type::*; +use crate::util::bit_util::from_ne_slice; // Macro to generate methods create Statistics. macro_rules! statistics_new_func { @@ -148,19 +149,11 @@ pub fn from_thrift( // min/max statistics for INT96 columns. let min = min.map(|data| { assert_eq!(data.len(), 12); - unsafe { - let raw = - std::slice::from_raw_parts(data.as_ptr() as *mut u32, 3); - Int96::from(Vec::from(raw)) - } + from_ne_slice::(&data) }); let max = max.map(|data| { assert_eq!(data.len(), 12); - unsafe { - let raw = - std::slice::from_raw_parts(data.as_ptr() as *mut u32, 3); - Int96::from(Vec::from(raw)) - } + from_ne_slice::(&data) }); Statistics::int96(min, max, distinct_count, null_count, old_format) } diff --git a/rust/parquet/src/record/api.rs b/rust/parquet/src/record/api.rs index bf7dc72e397..c011da8bf50 100644 --- a/rust/parquet/src/record/api.rs +++ b/rust/parquet/src/record/api.rs @@ -551,8 +551,7 @@ impl Field { match descr.physical_type() { PhysicalType::BYTE_ARRAY => match descr.logical_type() { LogicalType::UTF8 | LogicalType::ENUM | LogicalType::JSON => { - let value = - unsafe { String::from_utf8_unchecked(value.data().to_vec()) }; + let value = String::from_utf8(value.data().to_vec()).unwrap(); Field::Str(value) } LogicalType::BSON | LogicalType::NONE => Field::Bytes(value), diff --git a/rust/parquet/src/util/bit_util.rs b/rust/parquet/src/util/bit_util.rs index ddae990ae3c..34541af3dc1 100644 --- a/rust/parquet/src/util/bit_util.rs +++ b/rust/parquet/src/util/bit_util.rs @@ -15,14 +15,67 @@ // specific language governing permissions and limitations // under the License. -use std::{ - cmp, - mem::{size_of, transmute_copy}, -}; +use std::{cmp, mem::size_of}; +use crate::data_type::AsBytes; use crate::errors::{ParquetError, Result}; use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr}; +pub fn from_ne_slice(bs: &[u8]) -> T { + let mut b = T::Buffer::default(); + { + let b = b.as_mut(); + let bs = &bs[..b.len()]; + b.copy_from_slice(bs); + } + T::from_ne_bytes(b) +} + +pub trait FromBytes: Sized { + type Buffer: AsMut<[u8]> + Default; + fn from_le_bytes(bs: Self::Buffer) -> Self; + fn from_be_bytes(bs: Self::Buffer) -> Self; + fn from_ne_bytes(bs: Self::Buffer) -> Self; +} + +macro_rules! from_le_bytes { + ($($ty: ty),*) => { + $( + impl FromBytes for $ty { + type Buffer = [u8; size_of::()]; + fn from_le_bytes(bs: Self::Buffer) -> Self { + <$ty>::from_le_bytes(bs) + } + fn from_be_bytes(bs: Self::Buffer) -> Self { + <$ty>::from_be_bytes(bs) + } + fn from_ne_bytes(bs: Self::Buffer) -> Self { + <$ty>::from_ne_bytes(bs) + } + } + )* + }; +} + +impl FromBytes for bool { + type Buffer = [u8; 1]; + fn from_le_bytes(bs: Self::Buffer) -> Self { + Self::from_ne_bytes(bs) + } + fn from_be_bytes(bs: Self::Buffer) -> Self { + Self::from_ne_bytes(bs) + } + fn from_ne_bytes(bs: Self::Buffer) -> Self { + match bs[0] { + 0 => false, + 1 => true, + _ => panic!("Invalid byte when reading bool"), + } + } +} + +from_le_bytes! { u8, u16, u32, u64, i8, i16, i32, i64, f32, f64 } + /// Reads `$size` of bytes from `$src`, and reinterprets them as type `$ty`, in /// little-endian order. `$ty` must implement the `Default` trait. Otherwise this won't /// compile. @@ -30,50 +83,42 @@ use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr}; macro_rules! read_num_bytes { ($ty:ty, $size:expr, $src:expr) => {{ assert!($size <= $src.len()); - let mut data: $ty = Default::default(); - unsafe { - std::ptr::copy_nonoverlapping( - $src.as_ptr(), - &mut data as *mut $ty as *mut u8, - $size, - ); - } - data + let mut buffer = <$ty as $crate::util::bit_util::FromBytes>::Buffer::default(); + buffer.as_mut()[..$size].copy_from_slice(&$src[..$size]); + <$ty>::from_ne_bytes(buffer) }}; } /// Converts value `val` of type `T` to a byte vector, by reading `num_bytes` from `val`. /// NOTE: if `val` is less than the size of `T` then it can be truncated. #[inline] -pub fn convert_to_bytes(val: &T, num_bytes: usize) -> Vec { +pub fn convert_to_bytes(val: &T, num_bytes: usize) -> Vec +where + T: ?Sized + AsBytes, +{ let mut bytes: Vec = vec![0; num_bytes]; - memcpy_value(val, num_bytes, &mut bytes); + memcpy_value(val.as_bytes(), num_bytes, &mut bytes); bytes } #[inline] pub fn memcpy(source: &[u8], target: &mut [u8]) { assert!(target.len() >= source.len()); - unsafe { - std::ptr::copy_nonoverlapping(source.as_ptr(), target.as_mut_ptr(), source.len()) - } + target[..source.len()].copy_from_slice(source) } #[inline] -pub fn memcpy_value(source: &T, num_bytes: usize, target: &mut [u8]) { +pub fn memcpy_value(source: &T, num_bytes: usize, target: &mut [u8]) +where + T: ?Sized + AsBytes, +{ assert!( target.len() >= num_bytes, "Not enough space. Only had {} bytes but need to put {} bytes", target.len(), num_bytes ); - unsafe { - std::ptr::copy_nonoverlapping( - source as *const T as *const u8, - target.as_mut_ptr(), - num_bytes, - ) - } + memcpy(&source.as_bytes()[..num_bytes], target) } /// Returns the ceil of value/divisor @@ -276,6 +321,10 @@ impl BitWriter { self.max_bytes } + pub fn write_at(&mut self, offset: usize, value: u8) { + self.buffer[offset] = value; + } + /// Writes the `num_bits` LSB of value `v` to the internal buffer of this writer. /// The `num_bits` must not be greater than 64. This is bit packed. /// @@ -316,7 +365,7 @@ impl BitWriter { /// /// Returns false if there's not enough room left. True otherwise. #[inline] - pub fn put_aligned(&mut self, val: T, num_bytes: usize) -> bool { + pub fn put_aligned(&mut self, val: T, num_bytes: usize) -> bool { let result = self.get_next_byte_ptr(num_bytes); if result.is_err() { // TODO: should we return `Result` for this func? @@ -336,7 +385,7 @@ impl BitWriter { /// Returns false if there's not enough room left, or the `pos` is not valid. /// True otherwise. #[inline] - pub fn put_aligned_offset( + pub fn put_aligned_offset( &mut self, val: T, num_bytes: usize, @@ -444,7 +493,7 @@ impl BitReader { /// /// Returns `None` if there's not enough data available. `Some` otherwise. #[inline] - pub fn get_value(&mut self, num_bits: usize) -> Option { + pub fn get_value(&mut self, num_bits: usize) -> Option { assert!(num_bits <= 64); assert!(num_bits <= size_of::() * 8); @@ -466,12 +515,11 @@ impl BitReader { } // TODO: better to avoid copying here - let result: T = unsafe { transmute_copy::(&v) }; - Some(result) + Some(from_ne_slice(v.as_bytes())) } #[inline] - pub fn get_batch(&mut self, batch: &mut [T], num_bits: usize) -> usize { + pub fn get_batch(&mut self, batch: &mut [T], num_bits: usize) -> usize { assert!(num_bits <= 32); assert!(num_bits <= size_of::() * 8); @@ -497,6 +545,7 @@ impl BitReader { unsafe { let in_buf = &self.buffer.data()[self.byte_offset..]; let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32; + // FIXME assert!(memory::is_ptr_aligned(in_ptr)); if size_of::() == 4 { while values_to_read - i >= 32 { let out_ptr = &mut batch[i..] as *mut [T] as *mut T as *mut u32; @@ -553,7 +602,7 @@ impl BitReader { /// Returns `Some` if there's enough bytes left to form a value of `T`. /// Otherwise `None`. #[inline] - pub fn get_aligned(&mut self, num_bytes: usize) -> Option { + pub fn get_aligned(&mut self, num_bytes: usize) -> Option { let bytes_read = ceil(self.bit_offset as i64, 8) as usize; if self.byte_offset + bytes_read + num_bytes > self.total_bytes { return None; @@ -944,7 +993,7 @@ mod tests { fn test_get_batch_helper(total: usize, num_bits: usize) where - T: Default + Clone + Debug + Eq, + T: FromBytes + Default + Clone + Debug + Eq, { assert!(num_bits <= 32); let num_bytes = ceil(num_bits as i64, 8); @@ -956,10 +1005,8 @@ mod tests { .collect(); // Generic values used to check against actual values read from `get_batch`. - let expected_values: Vec = values - .iter() - .map(|v| unsafe { transmute_copy::(&v) }) - .collect(); + let expected_values: Vec = + values.iter().map(|v| from_ne_slice(v.as_bytes())).collect(); for i in 0..total { assert!(writer.put_value(values[i] as u64, num_bits)); @@ -993,7 +1040,7 @@ mod tests { fn test_put_aligned_rand_numbers(total: usize, num_bits: usize) where - T: Copy + Default + Debug + PartialEq, + T: Copy + FromBytes + AsBytes + Debug + PartialEq, Standard: Distribution, { assert!(num_bits <= 32); diff --git a/rust/parquet/src/util/hash_util.rs b/rust/parquet/src/util/hash_util.rs index 9207b0f15f2..3388fb2f79d 100644 --- a/rust/parquet/src/util/hash_util.rs +++ b/rust/parquet/src/util/hash_util.rs @@ -20,10 +20,14 @@ use crate::data_type::AsBytes; /// Computes hash value for `data`, with a seed value `seed`. /// The data type `T` must implement the `AsBytes` trait. pub fn hash(data: &T, seed: u32) -> u32 { + hash_(data.as_bytes(), seed) +} + +fn hash_(data: &[u8], seed: u32) -> u32 { #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] - { + unsafe { if is_x86_feature_detected!("sse4.2") { - unsafe { crc32_hash(data, seed) } + crc32_hash(data, seed) } else { murmur_hash2_64a(data, seed as u64) as u32 } @@ -34,16 +38,15 @@ const MURMUR_PRIME: u64 = 0xc6a4a7935bd1e995; const MURMUR_R: i32 = 47; /// Rust implementation of MurmurHash2, 64-bit version for 64-bit platforms -fn murmur_hash2_64a(data: &T, seed: u64) -> u64 { - let data_bytes = data.as_bytes(); +/// +/// SAFTETY Only safe on platforms which support unaligned loads (like x86_64) +unsafe fn murmur_hash2_64a(data_bytes: &[u8], seed: u64) -> u64 { let len = data_bytes.len(); let len_64 = (len / 8) * 8; - let data_bytes_64 = unsafe { - std::slice::from_raw_parts( - &data_bytes[0..len_64] as *const [u8] as *const u64, - len / 8, - ) - }; + let data_bytes_64 = std::slice::from_raw_parts( + &data_bytes[0..len_64] as *const [u8] as *const u64, + len / 8, + ); let mut h = seed ^ (MURMUR_PRIME.wrapping_mul(data_bytes.len() as u64)); for v in data_bytes_64 { @@ -92,13 +95,12 @@ fn murmur_hash2_64a(data: &T, seed: u64) -> u64 { /// CRC32 hash implementation using SSE4 instructions. Borrowed from Impala. #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] #[target_feature(enable = "sse4.2")] -unsafe fn crc32_hash(data: &T, seed: u32) -> u32 { +unsafe fn crc32_hash(bytes: &[u8], seed: u32) -> u32 { #[cfg(target_arch = "x86")] use std::arch::x86::*; #[cfg(target_arch = "x86_64")] use std::arch::x86_64::*; - let bytes: &[u8] = data.as_bytes(); let u32_num_bytes = std::mem::size_of::(); let mut num_bytes = bytes.len(); let num_words = num_bytes / u32_num_bytes; @@ -134,14 +136,16 @@ mod tests { #[test] fn test_murmur2_64a() { - let result = murmur_hash2_64a(&"hello", 123); - assert_eq!(result, 2597646618390559622); + unsafe { + let result = murmur_hash2_64a(b"hello", 123); + assert_eq!(result, 2597646618390559622); - let result = murmur_hash2_64a(&"helloworld", 123); - assert_eq!(result, 4934371746140206573); + let result = murmur_hash2_64a(b"helloworld", 123); + assert_eq!(result, 4934371746140206573); - let result = murmur_hash2_64a(&"helloworldparquet", 123); - assert_eq!(result, 2392198230801491746); + let result = murmur_hash2_64a(b"helloworldparquet", 123); + assert_eq!(result, 2392198230801491746); + } } #[test] @@ -149,13 +153,13 @@ mod tests { fn test_crc32() { if is_x86_feature_detected!("sse4.2") { unsafe { - let result = crc32_hash(&"hello", 123); + let result = crc32_hash(b"hello", 123); assert_eq!(result, 2927487359); - let result = crc32_hash(&"helloworld", 123); + let result = crc32_hash(b"helloworld", 123); assert_eq!(result, 314229527); - let result = crc32_hash(&"helloworldparquet", 123); + let result = crc32_hash(b"helloworldparquet", 123); assert_eq!(result, 667078870); } } diff --git a/rust/parquet/src/util/test_common/file_util.rs b/rust/parquet/src/util/test_common/file_util.rs index deed6cd5f4d..11e25e52a79 100644 --- a/rust/parquet/src/util/test_common/file_util.rs +++ b/rust/parquet/src/util/test_common/file_util.rs @@ -36,11 +36,14 @@ pub fn get_test_path(file_name: &str) -> PathBuf { /// Returns file handle for a test parquet file from 'data' directory pub fn get_test_file(file_name: &str) -> fs::File { - let file = fs::File::open(get_test_path(file_name).as_path()); - if file.is_err() { - panic!("Test file {} not found", file_name) - } - file.unwrap() + let path = get_test_path(file_name); + fs::File::open(path.as_path()).unwrap_or_else(|err| { + panic!( + "Test file {} could not be opened, did you do `git submodule update`?: {}", + path.display(), + err + ) + }) } /// Returns file handle for a temp file in 'target' directory with a provided content