From 62919d4fea988ed3564c1e4ec1d5c287a64b88bd Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Fri, 24 Jan 2020 11:44:23 +0100 Subject: [PATCH 01/20] Better error message for missing test data --- rust/parquet/src/util/test_common/file_util.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/rust/parquet/src/util/test_common/file_util.rs b/rust/parquet/src/util/test_common/file_util.rs index deed6cd5f4d..11e25e52a79 100644 --- a/rust/parquet/src/util/test_common/file_util.rs +++ b/rust/parquet/src/util/test_common/file_util.rs @@ -36,11 +36,14 @@ pub fn get_test_path(file_name: &str) -> PathBuf { /// Returns file handle for a test parquet file from 'data' directory pub fn get_test_file(file_name: &str) -> fs::File { - let file = fs::File::open(get_test_path(file_name).as_path()); - if file.is_err() { - panic!("Test file {} not found", file_name) - } - file.unwrap() + let path = get_test_path(file_name); + fs::File::open(path.as_path()).unwrap_or_else(|err| { + panic!( + "Test file {} could not be opened, did you do `git submodule update`?: {}", + path.display(), + err + ) + }) } /// Returns file handle for a temp file in 'target' directory with a provided content From 5a1558997297d54af574a70247e3f77f2fa449cb Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Fri, 24 Jan 2020 12:00:01 +0100 Subject: [PATCH 02/20] fix: murmur hash is only safe on platforms with unaligned loads Might be other platforms this works on as well, but those could be added as they are found --- rust/parquet/src/util/hash_util.rs | 46 ++++++++++++++++-------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/rust/parquet/src/util/hash_util.rs b/rust/parquet/src/util/hash_util.rs index 9207b0f15f2..3388fb2f79d 100644 --- a/rust/parquet/src/util/hash_util.rs +++ b/rust/parquet/src/util/hash_util.rs @@ -20,10 +20,14 @@ use crate::data_type::AsBytes; /// Computes hash value for `data`, with a seed value `seed`. /// The data type `T` must implement the `AsBytes` trait. pub fn hash(data: &T, seed: u32) -> u32 { + hash_(data.as_bytes(), seed) +} + +fn hash_(data: &[u8], seed: u32) -> u32 { #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] - { + unsafe { if is_x86_feature_detected!("sse4.2") { - unsafe { crc32_hash(data, seed) } + crc32_hash(data, seed) } else { murmur_hash2_64a(data, seed as u64) as u32 } @@ -34,16 +38,15 @@ const MURMUR_PRIME: u64 = 0xc6a4a7935bd1e995; const MURMUR_R: i32 = 47; /// Rust implementation of MurmurHash2, 64-bit version for 64-bit platforms -fn murmur_hash2_64a(data: &T, seed: u64) -> u64 { - let data_bytes = data.as_bytes(); +/// +/// SAFTETY Only safe on platforms which support unaligned loads (like x86_64) +unsafe fn murmur_hash2_64a(data_bytes: &[u8], seed: u64) -> u64 { let len = data_bytes.len(); let len_64 = (len / 8) * 8; - let data_bytes_64 = unsafe { - std::slice::from_raw_parts( - &data_bytes[0..len_64] as *const [u8] as *const u64, - len / 8, - ) - }; + let data_bytes_64 = std::slice::from_raw_parts( + &data_bytes[0..len_64] as *const [u8] as *const u64, + len / 8, + ); let mut h = seed ^ (MURMUR_PRIME.wrapping_mul(data_bytes.len() as u64)); for v in data_bytes_64 { @@ -92,13 +95,12 @@ fn murmur_hash2_64a(data: &T, seed: u64) -> u64 { /// CRC32 hash implementation using SSE4 instructions. Borrowed from Impala. #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] #[target_feature(enable = "sse4.2")] -unsafe fn crc32_hash(data: &T, seed: u32) -> u32 { +unsafe fn crc32_hash(bytes: &[u8], seed: u32) -> u32 { #[cfg(target_arch = "x86")] use std::arch::x86::*; #[cfg(target_arch = "x86_64")] use std::arch::x86_64::*; - let bytes: &[u8] = data.as_bytes(); let u32_num_bytes = std::mem::size_of::(); let mut num_bytes = bytes.len(); let num_words = num_bytes / u32_num_bytes; @@ -134,14 +136,16 @@ mod tests { #[test] fn test_murmur2_64a() { - let result = murmur_hash2_64a(&"hello", 123); - assert_eq!(result, 2597646618390559622); + unsafe { + let result = murmur_hash2_64a(b"hello", 123); + assert_eq!(result, 2597646618390559622); - let result = murmur_hash2_64a(&"helloworld", 123); - assert_eq!(result, 4934371746140206573); + let result = murmur_hash2_64a(b"helloworld", 123); + assert_eq!(result, 4934371746140206573); - let result = murmur_hash2_64a(&"helloworldparquet", 123); - assert_eq!(result, 2392198230801491746); + let result = murmur_hash2_64a(b"helloworldparquet", 123); + assert_eq!(result, 2392198230801491746); + } } #[test] @@ -149,13 +153,13 @@ mod tests { fn test_crc32() { if is_x86_feature_detected!("sse4.2") { unsafe { - let result = crc32_hash(&"hello", 123); + let result = crc32_hash(b"hello", 123); assert_eq!(result, 2927487359); - let result = crc32_hash(&"helloworld", 123); + let result = crc32_hash(b"helloworld", 123); assert_eq!(result, 314229527); - let result = crc32_hash(&"helloworldparquet", 123); + let result = crc32_hash(b"helloworldparquet", 123); assert_eq!(result, 667078870); } } From 56812580e3be47e174ac53a3b00e0031767f7d71 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Fri, 24 Jan 2020 15:46:18 +0100 Subject: [PATCH 03/20] fix: Use safe code to read types in bit_util Fixes undefined behaviour that could occur from safe code calling with `T == Box` etc. --- rust/parquet/src/util/bit_util.rs | 44 +++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/rust/parquet/src/util/bit_util.rs b/rust/parquet/src/util/bit_util.rs index ddae990ae3c..c737c512887 100644 --- a/rust/parquet/src/util/bit_util.rs +++ b/rust/parquet/src/util/bit_util.rs @@ -23,6 +23,34 @@ use std::{ use crate::errors::{ParquetError, Result}; use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr}; +pub trait FromBytes: Sized { + type Buffer: AsMut<[u8]> + Default; + fn from_le_bytes(bs: Self::Buffer) -> Self; + fn from_be_bytes(bs: Self::Buffer) -> Self; + fn from_ne_bytes(bs: Self::Buffer) -> Self; +} + +macro_rules! from_le_bytes { + ($($ty: ty),*) => { + $( + impl FromBytes for $ty { + type Buffer = [u8; size_of::()]; + fn from_le_bytes(bs: Self::Buffer) -> Self { + <$ty>::from_le_bytes(bs) + } + fn from_be_bytes(bs: Self::Buffer) -> Self { + <$ty>::from_be_bytes(bs) + } + fn from_ne_bytes(bs: Self::Buffer) -> Self { + <$ty>::from_ne_bytes(bs) + } + } + )* + }; +} + +from_le_bytes! { u8, u16, u32, u64, i8, i16, i32, i64 } + /// Reads `$size` of bytes from `$src`, and reinterprets them as type `$ty`, in /// little-endian order. `$ty` must implement the `Default` trait. Otherwise this won't /// compile. @@ -30,15 +58,9 @@ use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr}; macro_rules! read_num_bytes { ($ty:ty, $size:expr, $src:expr) => {{ assert!($size <= $src.len()); - let mut data: $ty = Default::default(); - unsafe { - std::ptr::copy_nonoverlapping( - $src.as_ptr(), - &mut data as *mut $ty as *mut u8, - $size, - ); - } - data + let mut buffer = <$ty as $crate::util::bit_util::FromBytes>::Buffer::default(); + buffer.as_mut()[..$size].copy_from_slice(&$src[..$size]); + <$ty>::from_ne_bytes(buffer) }}; } @@ -553,7 +575,7 @@ impl BitReader { /// Returns `Some` if there's enough bytes left to form a value of `T`. /// Otherwise `None`. #[inline] - pub fn get_aligned(&mut self, num_bytes: usize) -> Option { + pub fn get_aligned(&mut self, num_bytes: usize) -> Option { let bytes_read = ceil(self.bit_offset as i64, 8) as usize; if self.byte_offset + bytes_read + num_bytes > self.total_bytes { return None; @@ -993,7 +1015,7 @@ mod tests { fn test_put_aligned_rand_numbers(total: usize, num_bits: usize) where - T: Copy + Default + Debug + PartialEq, + T: Copy + FromBytes + Debug + PartialEq, Standard: Distribution, { assert!(num_bits <= 32); From 44bf359741d29442630fe9a8bbd842c918d19f0f Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Fri, 24 Jan 2020 15:48:18 +0100 Subject: [PATCH 04/20] Remove unnecessary unsafe in memcpy functions --- rust/parquet/src/data_type.rs | 14 ++++++++++++-- rust/parquet/src/util/bit_util.rs | 31 +++++++++++++++---------------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs index d7e20fa5533..a04eec89601 100644 --- a/rust/parquet/src/data_type.rs +++ b/rust/parquet/src/data_type.rs @@ -303,6 +303,12 @@ pub trait AsBytes { fn as_bytes(&self) -> &[u8]; } +impl AsBytes for [u8] { + fn as_bytes(&self) -> &[u8] { + self + } +} + macro_rules! gen_as_bytes { ($source_ty:ident) => { impl AsBytes for $source_ty { @@ -319,10 +325,14 @@ macro_rules! gen_as_bytes { } gen_as_bytes!(bool); -gen_as_bytes!(u8); +gen_as_bytes!(i8); +gen_as_bytes!(i16); gen_as_bytes!(i32); -gen_as_bytes!(u32); gen_as_bytes!(i64); +gen_as_bytes!(u8); +gen_as_bytes!(u16); +gen_as_bytes!(u32); +gen_as_bytes!(u64); gen_as_bytes!(f32); gen_as_bytes!(f64); diff --git a/rust/parquet/src/util/bit_util.rs b/rust/parquet/src/util/bit_util.rs index c737c512887..b93d8841ba3 100644 --- a/rust/parquet/src/util/bit_util.rs +++ b/rust/parquet/src/util/bit_util.rs @@ -20,6 +20,7 @@ use std::{ mem::{size_of, transmute_copy}, }; +use crate::data_type::AsBytes; use crate::errors::{ParquetError, Result}; use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr}; @@ -67,35 +68,33 @@ macro_rules! read_num_bytes { /// Converts value `val` of type `T` to a byte vector, by reading `num_bytes` from `val`. /// NOTE: if `val` is less than the size of `T` then it can be truncated. #[inline] -pub fn convert_to_bytes(val: &T, num_bytes: usize) -> Vec { +pub fn convert_to_bytes(val: &T, num_bytes: usize) -> Vec +where + T: ?Sized + AsBytes, +{ let mut bytes: Vec = vec![0; num_bytes]; - memcpy_value(val, num_bytes, &mut bytes); + memcpy_value(val.as_bytes(), num_bytes, &mut bytes); bytes } #[inline] pub fn memcpy(source: &[u8], target: &mut [u8]) { assert!(target.len() >= source.len()); - unsafe { - std::ptr::copy_nonoverlapping(source.as_ptr(), target.as_mut_ptr(), source.len()) - } + target[..source.len()].copy_from_slice(source) } #[inline] -pub fn memcpy_value(source: &T, num_bytes: usize, target: &mut [u8]) { +pub fn memcpy_value(source: &T, num_bytes: usize, target: &mut [u8]) +where + T: ?Sized + AsBytes, +{ assert!( target.len() >= num_bytes, "Not enough space. Only had {} bytes but need to put {} bytes", target.len(), num_bytes ); - unsafe { - std::ptr::copy_nonoverlapping( - source as *const T as *const u8, - target.as_mut_ptr(), - num_bytes, - ) - } + memcpy(&source.as_bytes()[..num_bytes], target) } /// Returns the ceil of value/divisor @@ -338,7 +337,7 @@ impl BitWriter { /// /// Returns false if there's not enough room left. True otherwise. #[inline] - pub fn put_aligned(&mut self, val: T, num_bytes: usize) -> bool { + pub fn put_aligned(&mut self, val: T, num_bytes: usize) -> bool { let result = self.get_next_byte_ptr(num_bytes); if result.is_err() { // TODO: should we return `Result` for this func? @@ -358,7 +357,7 @@ impl BitWriter { /// Returns false if there's not enough room left, or the `pos` is not valid. /// True otherwise. #[inline] - pub fn put_aligned_offset( + pub fn put_aligned_offset( &mut self, val: T, num_bytes: usize, @@ -1015,7 +1014,7 @@ mod tests { fn test_put_aligned_rand_numbers(total: usize, num_bits: usize) where - T: Copy + FromBytes + Debug + PartialEq, + T: Copy + FromBytes + AsBytes + Debug + PartialEq, Standard: Distribution, { assert!(num_bits <= 32); From 27b7b2939f5f19e0df63b55a644dfdfd710f1884 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Fri, 24 Jan 2020 16:14:30 +0100 Subject: [PATCH 05/20] fix: Don't allow arbitrary transmutes when decoding This may cause panics in code using ByteArray or Int96 but no tests currently test these paths. Still better than the status quo which would be undefined behaviour (if the replaced code paths were hit). --- rust/parquet/src/data_type.rs | 36 +++++++++++++++++-- rust/parquet/src/encodings/decoding.rs | 12 +++++-- rust/parquet/src/encodings/rle.rs | 6 ++-- rust/parquet/src/util/bit_util.rs | 49 ++++++++++++++++++-------- 4 files changed, 81 insertions(+), 22 deletions(-) diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs index a04eec89601..9fdaff1ba44 100644 --- a/rust/parquet/src/data_type.rs +++ b/rust/parquet/src/data_type.rs @@ -26,7 +26,10 @@ use crate::basic::Type; use crate::column::reader::{ColumnReader, ColumnReaderImpl}; use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; use crate::errors::{ParquetError, Result}; -use crate::util::memory::{ByteBuffer, ByteBufferPtr}; +use crate::util::{ + bit_util::FromBytes, + memory::{ByteBuffer, ByteBufferPtr}, +}; use std::str::from_utf8; /// Rust representation for logical type INT96, value is backed by an array of `u32`. @@ -381,7 +384,8 @@ pub trait DataType: 'static { + std::fmt::Debug + std::default::Default + std::clone::Clone - + AsBytes; + + AsBytes + + FromBytes; /// Returns Parquet physical type. fn get_physical_type() -> Type; @@ -531,6 +535,34 @@ make_type!( mem::size_of::() ); +impl FromBytes for Int96 { + type Buffer = [u8; 12]; + fn from_le_bytes(_bs: Self::Buffer) -> Self { + unimplemented!() + } + fn from_be_bytes(_bs: Self::Buffer) -> Self { + unimplemented!() + } + fn from_ne_bytes(_bs: Self::Buffer) -> Self { + unimplemented!() + } +} + +// FIXME Needed to satisfy the constraint of many decoding functions but ByteArray does not +// appear to actual be converted directly from bytes +impl FromBytes for ByteArray { + type Buffer = [u8; 8]; + fn from_le_bytes(_bs: Self::Buffer) -> Self { + unreachable!() + } + fn from_be_bytes(_bs: Self::Buffer) -> Self { + unreachable!() + } + fn from_ne_bytes(_bs: Self::Buffer) -> Self { + unreachable!() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/parquet/src/encodings/decoding.rs b/rust/parquet/src/encodings/decoding.rs index c3d5d7c83f8..949ddbfd525 100644 --- a/rust/parquet/src/encodings/decoding.rs +++ b/rust/parquet/src/encodings/decoding.rs @@ -28,7 +28,7 @@ use crate::data_type::*; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::{ - bit_util::{self, BitReader}, + bit_util::{self, BitReader, FromBytes}, memory::{ByteBuffer, ByteBufferPtr}, }; @@ -541,7 +541,10 @@ impl DeltaBitPackDecoder { /// Loads delta into mini block. #[inline] - fn load_deltas_in_mini_block(&mut self) -> Result<()> { + fn load_deltas_in_mini_block(&mut self) -> Result<()> + where + T::T: FromBytes, + { self.deltas_in_mini_block.clear(); if self.use_batch { self.deltas_in_mini_block @@ -566,7 +569,10 @@ impl DeltaBitPackDecoder { } } -impl Decoder for DeltaBitPackDecoder { +impl Decoder for DeltaBitPackDecoder +where + T::T: FromBytes, +{ // # of total values is derived from encoding #[inline] default fn set_data(&mut self, data: ByteBufferPtr, _: usize) -> Result<()> { diff --git a/rust/parquet/src/encodings/rle.rs b/rust/parquet/src/encodings/rle.rs index 8463df7876a..2278cbfaa66 100644 --- a/rust/parquet/src/encodings/rle.rs +++ b/rust/parquet/src/encodings/rle.rs @@ -22,7 +22,7 @@ use std::{ use crate::errors::{ParquetError, Result}; use crate::util::{ - bit_util::{self, BitReader, BitWriter}, + bit_util::{self, BitReader, BitWriter, FromBytes}, memory::ByteBufferPtr, }; @@ -369,7 +369,7 @@ impl RleDecoder { } #[inline] - pub fn get(&mut self) -> Result> { + pub fn get(&mut self) -> Result> { assert!(size_of::() <= 8); while self.rle_left <= 0 && self.bit_packed_left <= 0 { @@ -402,7 +402,7 @@ impl RleDecoder { } #[inline] - pub fn get_batch(&mut self, buffer: &mut [T]) -> Result { + pub fn get_batch(&mut self, buffer: &mut [T]) -> Result { assert!(self.bit_reader.is_some()); assert!(size_of::() <= 8); diff --git a/rust/parquet/src/util/bit_util.rs b/rust/parquet/src/util/bit_util.rs index b93d8841ba3..3baa0322795 100644 --- a/rust/parquet/src/util/bit_util.rs +++ b/rust/parquet/src/util/bit_util.rs @@ -15,15 +15,22 @@ // specific language governing permissions and limitations // under the License. -use std::{ - cmp, - mem::{size_of, transmute_copy}, -}; +use std::{cmp, mem::size_of}; use crate::data_type::AsBytes; use crate::errors::{ParquetError, Result}; use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr}; +fn from_ne_slice(bs: &[u8]) -> T { + let mut b = T::Buffer::default(); + { + let b = b.as_mut(); + let bs = &bs[..b.len()]; + b.copy_from_slice(bs); + } + T::from_ne_bytes(b) +} + pub trait FromBytes: Sized { type Buffer: AsMut<[u8]> + Default; fn from_le_bytes(bs: Self::Buffer) -> Self; @@ -50,7 +57,24 @@ macro_rules! from_le_bytes { }; } -from_le_bytes! { u8, u16, u32, u64, i8, i16, i32, i64 } +impl FromBytes for bool { + type Buffer = [u8; 1]; + fn from_le_bytes(bs: Self::Buffer) -> Self { + Self::from_ne_bytes(bs) + } + fn from_be_bytes(bs: Self::Buffer) -> Self { + Self::from_ne_bytes(bs) + } + fn from_ne_bytes(bs: Self::Buffer) -> Self { + match bs[0] { + 0 => false, + 1 => true, + _ => panic!("Invalid byte when reading bool"), + } + } +} + +from_le_bytes! { u8, u16, u32, u64, i8, i16, i32, i64, f32, f64 } /// Reads `$size` of bytes from `$src`, and reinterprets them as type `$ty`, in /// little-endian order. `$ty` must implement the `Default` trait. Otherwise this won't @@ -465,7 +489,7 @@ impl BitReader { /// /// Returns `None` if there's not enough data available. `Some` otherwise. #[inline] - pub fn get_value(&mut self, num_bits: usize) -> Option { + pub fn get_value(&mut self, num_bits: usize) -> Option { assert!(num_bits <= 64); assert!(num_bits <= size_of::() * 8); @@ -487,12 +511,11 @@ impl BitReader { } // TODO: better to avoid copying here - let result: T = unsafe { transmute_copy::(&v) }; - Some(result) + Some(from_ne_slice(v.as_bytes())) } #[inline] - pub fn get_batch(&mut self, batch: &mut [T], num_bits: usize) -> usize { + pub fn get_batch(&mut self, batch: &mut [T], num_bits: usize) -> usize { assert!(num_bits <= 32); assert!(num_bits <= size_of::() * 8); @@ -965,7 +988,7 @@ mod tests { fn test_get_batch_helper(total: usize, num_bits: usize) where - T: Default + Clone + Debug + Eq, + T: FromBytes + Default + Clone + Debug + Eq, { assert!(num_bits <= 32); let num_bytes = ceil(num_bits as i64, 8); @@ -977,10 +1000,8 @@ mod tests { .collect(); // Generic values used to check against actual values read from `get_batch`. - let expected_values: Vec = values - .iter() - .map(|v| unsafe { transmute_copy::(&v) }) - .collect(); + let expected_values: Vec = + values.iter().map(|v| from_ne_slice(v.as_bytes())).collect(); for i in 0..total { assert!(writer.put_value(values[i] as u64, num_bits)); From 20e62c4f94676eb758af229ad7f2ad7bf02c9557 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Fri, 24 Jan 2020 17:12:18 +0100 Subject: [PATCH 06/20] Remove unnecessary unsafe in Int96 construction --- rust/parquet/src/data_type.rs | 12 +++++++++--- rust/parquet/src/file/statistics.rs | 13 +++---------- rust/parquet/src/util/bit_util.rs | 2 +- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs index 9fdaff1ba44..8b37cfa1b27 100644 --- a/rust/parquet/src/data_type.rs +++ b/rust/parquet/src/data_type.rs @@ -27,7 +27,7 @@ use crate::column::reader::{ColumnReader, ColumnReaderImpl}; use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; use crate::errors::{ParquetError, Result}; use crate::util::{ - bit_util::FromBytes, + bit_util::{from_ne_slice, FromBytes}, memory::{ByteBuffer, ByteBufferPtr}, }; use std::str::from_utf8; @@ -543,8 +543,14 @@ impl FromBytes for Int96 { fn from_be_bytes(_bs: Self::Buffer) -> Self { unimplemented!() } - fn from_ne_bytes(_bs: Self::Buffer) -> Self { - unimplemented!() + fn from_ne_bytes(bs: Self::Buffer) -> Self { + let mut i = Int96::new(); + i.set_data( + from_ne_slice(&bs[0..4]), + from_ne_slice(&bs[4..8]), + from_ne_slice(&bs[8..12]), + ); + i } } diff --git a/rust/parquet/src/file/statistics.rs b/rust/parquet/src/file/statistics.rs index 421361cddb5..edd3c63f24f 100644 --- a/rust/parquet/src/file/statistics.rs +++ b/rust/parquet/src/file/statistics.rs @@ -44,6 +44,7 @@ use parquet_format::Statistics as TStatistics; use crate::basic::Type; use crate::data_type::*; +use crate::util::bit_util::from_ne_slice; // Macro to generate methods create Statistics. macro_rules! statistics_new_func { @@ -148,19 +149,11 @@ pub fn from_thrift( // min/max statistics for INT96 columns. let min = min.map(|data| { assert_eq!(data.len(), 12); - unsafe { - let raw = - std::slice::from_raw_parts(data.as_ptr() as *mut u32, 3); - Int96::from(Vec::from(raw)) - } + from_ne_slice::(&data) }); let max = max.map(|data| { assert_eq!(data.len(), 12); - unsafe { - let raw = - std::slice::from_raw_parts(data.as_ptr() as *mut u32, 3); - Int96::from(Vec::from(raw)) - } + from_ne_slice::(&data) }); Statistics::int96(min, max, distinct_count, null_count, old_format) } diff --git a/rust/parquet/src/util/bit_util.rs b/rust/parquet/src/util/bit_util.rs index 3baa0322795..2d848e85a8e 100644 --- a/rust/parquet/src/util/bit_util.rs +++ b/rust/parquet/src/util/bit_util.rs @@ -21,7 +21,7 @@ use crate::data_type::AsBytes; use crate::errors::{ParquetError, Result}; use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr}; -fn from_ne_slice(bs: &[u8]) -> T { +pub fn from_ne_slice(bs: &[u8]) -> T { let mut b = T::Buffer::default(); { let b = b.as_mut(); From 4df3f2bcc1939c29770fe7d6c4c2b50423e8fb75 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Fri, 24 Jan 2020 17:13:40 +0100 Subject: [PATCH 07/20] fix: Don't trust parquet data to be UTF-8 --- rust/parquet/src/record/api.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/parquet/src/record/api.rs b/rust/parquet/src/record/api.rs index bf7dc72e397..c011da8bf50 100644 --- a/rust/parquet/src/record/api.rs +++ b/rust/parquet/src/record/api.rs @@ -551,8 +551,7 @@ impl Field { match descr.physical_type() { PhysicalType::BYTE_ARRAY => match descr.logical_type() { LogicalType::UTF8 | LogicalType::ENUM | LogicalType::JSON => { - let value = - unsafe { String::from_utf8_unchecked(value.data().to_vec()) }; + let value = String::from_utf8(value.data().to_vec()).unwrap(); Field::Str(value) } LogicalType::BSON | LogicalType::NONE => Field::Bytes(value), From 5a6977360f9eabd25ba8f6d6100de34f0b4887cd Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Wed, 5 Feb 2020 15:00:09 +0100 Subject: [PATCH 08/20] refactor: Remove unnecessary unsafe in decoding --- rust/parquet/src/encodings/decoding.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/rust/parquet/src/encodings/decoding.rs b/rust/parquet/src/encodings/decoding.rs index 949ddbfd525..9b8b600b32f 100644 --- a/rust/parquet/src/encodings/decoding.rs +++ b/rust/parquet/src/encodings/decoding.rs @@ -934,7 +934,7 @@ impl Decoder for DeltaByteArrayDecoder [u8; 4] { - unsafe { mem::transmute::(v as u32) } + (v as u32).to_ne_bytes() } /// A util trait to convert slices of different types to byte arrays @@ -1488,11 +1488,7 @@ mod tests { fn to_byte_array(data: &[Int96]) -> Vec { let mut v = vec![]; for d in data { - unsafe { - let copy = - std::slice::from_raw_parts(d.data().as_ptr() as *const u8, 12); - v.extend_from_slice(copy); - }; + v.extend_from_slice(d.as_bytes()); } v } From fb719ebcbf1545b7475b77af9a2e449a453e15d4 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Wed, 5 Feb 2020 15:24:14 +0100 Subject: [PATCH 09/20] fix: Restrict which types can be transmuted to byte slices Some types such as `&[Int96]` and `&[ByteArray]` can't be transmuted to a `&[u8]` as they aren't plain old data. `&mut bool` can't be transmuted to a `&mut u8` since that would allow writing values other than 0 or 1 to it. --- rust/parquet/src/data_type.rs | 46 +++++++++++++++++++++++++- rust/parquet/src/encodings/decoding.rs | 32 +++++++++--------- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs index 8b37cfa1b27..c0870d0065c 100644 --- a/rust/parquet/src/data_type.rs +++ b/rust/parquet/src/data_type.rs @@ -306,6 +306,13 @@ pub trait AsBytes { fn as_bytes(&self) -> &[u8]; } +/// Converts an slice of a data type to a slice of bytes. +pub trait SliceAsBytes: Sized { + /// Returns slice of bytes for a slice of this data type. + fn slice_as_bytes(self_: &[Self]) -> &[u8]; + fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8]; +} + impl AsBytes for [u8] { fn as_bytes(&self) -> &[u8] { self @@ -324,10 +331,27 @@ macro_rules! gen_as_bytes { } } } + impl SliceAsBytes for $source_ty { + fn slice_as_bytes(self_: &[Self]) -> &[u8] { + unsafe { + std::slice::from_raw_parts( + self_.as_ptr() as *const u8, + std::mem::size_of::<$source_ty>() * self_.len(), + ) + } + } + fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] { + unsafe { + std::slice::from_raw_parts_mut( + self_.as_mut_ptr() as *mut u8, + std::mem::size_of::<$source_ty>() * self_.len(), + ) + } + } + } }; } -gen_as_bytes!(bool); gen_as_bytes!(i8); gen_as_bytes!(i16); gen_as_bytes!(i32); @@ -339,6 +363,12 @@ gen_as_bytes!(u64); gen_as_bytes!(f32); gen_as_bytes!(f64); +impl AsBytes for bool { + fn as_bytes(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) } + } +} + impl AsBytes for Int96 { fn as_bytes(&self) -> &[u8] { unsafe { @@ -414,6 +444,20 @@ pub trait DataType: 'static { Self: Sized; } +// Workaround bug in specialization +pub trait SliceAsBytesDataType: DataType +where + Self::T: SliceAsBytes, +{ +} + +impl SliceAsBytesDataType for T +where + T: DataType, + ::T: SliceAsBytes, +{ +} + macro_rules! make_type { ($name:ident, $physical_ty:path, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => { pub struct $name {} diff --git a/rust/parquet/src/encodings/decoding.rs b/rust/parquet/src/encodings/decoding.rs index 9b8b600b32f..647d423f72e 100644 --- a/rust/parquet/src/encodings/decoding.rs +++ b/rust/parquet/src/encodings/decoding.rs @@ -17,7 +17,7 @@ //! Contains all supported decoders for Parquet. -use std::{cmp, marker::PhantomData, mem, slice::from_raw_parts_mut}; +use std::{cmp, marker::PhantomData, mem}; use super::rle::RleDecoder; @@ -188,7 +188,17 @@ impl Decoder for PlainDecoder { } #[inline] - default fn get(&mut self, buffer: &mut [T::T]) -> Result { + default fn get(&mut self, _buffer: &mut [T::T]) -> Result { + unreachable!() + } +} + +impl Decoder for PlainDecoder +where + T::T: SliceAsBytes, +{ + #[inline] + fn get(&mut self, buffer: &mut [T::T]) -> Result { assert!(self.data.is_some()); let data = self.data.as_mut().unwrap(); @@ -198,8 +208,7 @@ impl Decoder for PlainDecoder { if bytes_left < bytes_to_decode { return Err(eof_err!("Not enough bytes to decode")); } - let raw_buffer: &mut [u8] = - unsafe { from_raw_parts_mut(buffer.as_ptr() as *mut u8, bytes_to_decode) }; + let raw_buffer = &mut T::T::slice_as_bytes_mut(buffer)[..bytes_to_decode]; raw_buffer.copy_from_slice(data.range(self.start, bytes_to_decode).as_ref()); self.start += bytes_to_decode; self.num_values -= num_values; @@ -245,7 +254,7 @@ impl Decoder for PlainDecoder { Ok(()) } - fn get(&mut self, buffer: &mut [bool]) -> Result { + default fn get(&mut self, buffer: &mut [bool]) -> Result { assert!(self.bit_reader.is_some()); let bit_reader = self.bit_reader.as_mut().unwrap(); @@ -1454,18 +1463,11 @@ mod tests { impl ToByteArray for T where - T: DataType, + T: SliceAsBytesDataType, + ::T: SliceAsBytes, { default fn to_byte_array(data: &[T::T]) -> Vec { - let mut v = vec![]; - let type_len = std::mem::size_of::(); - v.extend_from_slice(unsafe { - std::slice::from_raw_parts( - data.as_ptr() as *const u8, - data.len() * type_len, - ) - }); - v + ::T::slice_as_bytes(data).to_vec() } } From c9996be17d2d4c030c46b824c541c8532cfdebd2 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Wed, 5 Feb 2020 16:12:09 +0100 Subject: [PATCH 10/20] refactor: Remove unnecessary unsafe from rle encoding --- rust/parquet/src/encodings/rle.rs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/rust/parquet/src/encodings/rle.rs b/rust/parquet/src/encodings/rle.rs index 2278cbfaa66..26df49f70fb 100644 --- a/rust/parquet/src/encodings/rle.rs +++ b/rust/parquet/src/encodings/rle.rs @@ -15,14 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::{ - cmp, - mem::{size_of, transmute_copy}, -}; +use std::{cmp, mem::size_of}; use crate::errors::{ParquetError, Result}; use crate::util::{ - bit_util::{self, BitReader, BitWriter, FromBytes}, + bit_util::{self, from_ne_slice, BitReader, BitWriter, FromBytes}, memory::ByteBufferPtr, }; @@ -379,13 +376,13 @@ impl RleDecoder { } let value = if self.rle_left > 0 { - let rle_value = unsafe { - transmute_copy::( - self.current_value - .as_mut() - .expect("current_value should be Some"), - ) - }; + let rle_value = from_ne_slice( + &self + .current_value + .as_mut() + .expect("current_value should be Some") + .to_ne_bytes(), + ); self.rle_left -= 1; rle_value } else { @@ -413,9 +410,9 @@ impl RleDecoder { let num_values = cmp::min(buffer.len() - values_read, self.rle_left as usize); for i in 0..num_values { - let repeated_value = unsafe { - transmute_copy::(self.current_value.as_mut().unwrap()) - }; + let repeated_value = from_ne_slice( + &self.current_value.as_mut().unwrap().to_ne_bytes(), + ); buffer[values_read + i] = repeated_value; } self.rle_left -= num_values as u32; From fc0d0d48082471cc5d57be94034b52ceded5f427 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Wed, 5 Feb 2020 18:56:16 +0100 Subject: [PATCH 11/20] fix: Prevent aliasing of &mut with FatPtr by threading lifetimes --- rust/parquet/src/arrow/record_reader.rs | 62 +++++++++++++------------ 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index de42ae7f953..5c7a8b08eac 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -16,8 +16,8 @@ // under the License. use std::cmp::{max, min}; +use std::mem::align_of; use std::mem::size_of; -use std::mem::transmute; use std::mem::{replace, swap}; use std::slice; @@ -28,6 +28,7 @@ use crate::schema::types::ColumnDescPtr; use arrow::array::{BooleanBufferBuilder, BufferBuilderTrait}; use arrow::bitmap::Bitmap; use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::memory; const MIN_BATCH_SIZE: usize = 1024; @@ -53,39 +54,39 @@ pub struct RecordReader { } #[derive(Debug)] -struct FatPtr { - ptr: *const T, - len: usize, +struct FatPtr<'a, T> { + ptr: &'a mut [T], } -impl FatPtr { - fn new(ptr: *const T, len: usize) -> Self { - Self { ptr, len } +impl<'a, T> FatPtr<'a, T> { + fn new(ptr: &'a mut [T]) -> Self { + Self { ptr } } - fn with_offset(buf: &MutableBuffer, offset: usize) -> Self { + fn with_offset(buf: &'a mut MutableBuffer, offset: usize) -> Self { FatPtr::::with_offset_and_size(buf, offset, size_of::()) } fn with_offset_and_size( - buf: &MutableBuffer, + buf: &'a mut MutableBuffer, offset: usize, type_size: usize, ) -> Self { + assert!(align_of::() <= memory::ALIGNMENT); unsafe { - FatPtr::new( - transmute::<*const u8, *mut T>(buf.raw_data()).add(offset), + FatPtr::new(slice::from_raw_parts_mut( + &mut *(buf.raw_data() as *mut T).add(offset), buf.capacity() / type_size - offset, - ) + )) } } fn to_slice(&self) -> &[T] { - unsafe { slice::from_raw_parts(self.ptr, self.len) } + self.ptr } - fn to_slice_mut(&self) -> &mut [T] { - unsafe { slice::from_raw_parts_mut(self.ptr as *mut T, self.len) } + fn to_slice_mut(&mut self) -> &mut [T] { + self.ptr } } @@ -198,10 +199,10 @@ impl RecordReader { ); new_buffer.resize(num_left_values * size_of::())?; - let new_def_levels = FatPtr::::with_offset(&new_buffer, 0); + let mut new_def_levels = FatPtr::::with_offset(&mut new_buffer, 0); let new_def_levels = new_def_levels.to_slice_mut(); let left_def_levels = - FatPtr::::with_offset(&def_levels_buf, self.num_values); + FatPtr::::with_offset(def_levels_buf, self.num_values); let left_def_levels = left_def_levels.to_slice(); new_def_levels[0..num_left_values] @@ -227,10 +228,10 @@ impl RecordReader { ); new_buffer.resize(num_left_values * size_of::())?; - let new_rep_levels = FatPtr::::with_offset(&new_buffer, 0); + let mut new_rep_levels = FatPtr::::with_offset(&mut new_buffer, 0); let new_rep_levels = new_rep_levels.to_slice_mut(); let left_rep_levels = - FatPtr::::with_offset(&rep_levels_buf, self.num_values); + FatPtr::::with_offset(rep_levels_buf, self.num_values); let left_rep_levels = left_rep_levels.to_slice(); new_rep_levels[0..num_left_values] @@ -254,11 +255,11 @@ impl RecordReader { let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values)); new_buffer.resize(num_left_values * T::get_type_size())?; - let new_records = - FatPtr::::with_offset_and_size(&new_buffer, 0, T::get_type_size()); + let mut new_records = + FatPtr::::with_offset_and_size(&mut new_buffer, 0, T::get_type_size()); let new_records = new_records.to_slice_mut(); - let left_records = FatPtr::::with_offset_and_size( - &self.records, + let mut left_records = FatPtr::::with_offset_and_size( + &mut self.records, self.num_values, T::get_type_size(), ); @@ -336,21 +337,22 @@ impl RecordReader { } // Convert mutable buffer spaces to mutable slices - let values_buf = FatPtr::::with_offset_and_size( - &self.records, + let mut values_buf = FatPtr::::with_offset_and_size( + &mut self.records, self.values_written, T::get_type_size(), ); + let values_written = self.values_written; let mut def_levels_buf = self .def_levels - .as_ref() - .map(|buf| FatPtr::::with_offset(buf, self.values_written)); + .as_mut() + .map(|buf| FatPtr::::with_offset(buf, values_written)); let mut rep_levels_buf = self .rep_levels - .as_ref() - .map(|buf| FatPtr::::with_offset(buf, self.values_written)); + .as_mut() + .map(|buf| FatPtr::::with_offset(buf, values_written)); let (values_read, levels_read) = self.column_reader.as_mut().unwrap().read_batch( @@ -421,7 +423,7 @@ impl RecordReader { fn split_records(&mut self, records_to_read: usize) -> Result { let rep_levels_buf = self .rep_levels - .as_ref() + .as_mut() .map(|buf| FatPtr::::with_offset(buf, 0)); let rep_levels_buf = rep_levels_buf.as_ref().map(|x| x.to_slice()); From b73debc7f1ed2de23916ba229336b057e900dc0b Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Wed, 5 Feb 2020 19:44:17 +0100 Subject: [PATCH 12/20] fix: Don't alias &mut from bit_writer --- rust/parquet/src/encodings/encoding.rs | 8 ++------ rust/parquet/src/util/bit_util.rs | 5 +++++ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/rust/parquet/src/encodings/encoding.rs b/rust/parquet/src/encodings/encoding.rs index c9cad12e42d..dd88c445f33 100644 --- a/rust/parquet/src/encodings/encoding.rs +++ b/rust/parquet/src/encodings/encoding.rs @@ -634,11 +634,7 @@ impl DeltaBitPackEncoder { self.bit_writer.put_zigzag_vlq_int(min_delta); // Slice to store bit width for each mini block - // apply unsafe allocation to avoid double mutable borrow - let mini_block_widths: &mut [u8] = unsafe { - let tmp_slice = self.bit_writer.get_next_byte_ptr(self.num_mini_blocks)?; - slice::from_raw_parts_mut(tmp_slice.as_ptr() as *mut u8, self.num_mini_blocks) - }; + let offset = self.bit_writer.skip(self.num_mini_blocks)?; for i in 0..self.num_mini_blocks { // Find how many values we need to encode - either block size or whatever @@ -657,7 +653,7 @@ impl DeltaBitPackEncoder { // Compute bit width to store (max_delta - min_delta) let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)); - mini_block_widths[i] = bit_width as u8; + self.bit_writer.write_at(offset + i, bit_width as u8); // Encode values in current mini block using min_delta and bit_width for j in 0..n { diff --git a/rust/parquet/src/util/bit_util.rs b/rust/parquet/src/util/bit_util.rs index 2d848e85a8e..34541af3dc1 100644 --- a/rust/parquet/src/util/bit_util.rs +++ b/rust/parquet/src/util/bit_util.rs @@ -321,6 +321,10 @@ impl BitWriter { self.max_bytes } + pub fn write_at(&mut self, offset: usize, value: u8) { + self.buffer[offset] = value; + } + /// Writes the `num_bits` LSB of value `v` to the internal buffer of this writer. /// The `num_bits` must not be greater than 64. This is bit packed. /// @@ -541,6 +545,7 @@ impl BitReader { unsafe { let in_buf = &self.buffer.data()[self.byte_offset..]; let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32; + // FIXME assert!(memory::is_ptr_aligned(in_ptr)); if size_of::() == 4 { while values_to_read - i >= 32 { let out_ptr = &mut batch[i..] as *mut [T] as *mut T as *mut u32; From 92e32050a36a0814901dfde8ab37b638b45ac512 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Wed, 5 Feb 2020 19:48:53 +0100 Subject: [PATCH 13/20] refactor: Avoid unsafe in put --- rust/parquet/src/encodings/encoding.rs | 28 +++++++++++++++----------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/rust/parquet/src/encodings/encoding.rs b/rust/parquet/src/encodings/encoding.rs index dd88c445f33..98c61a05329 100644 --- a/rust/parquet/src/encodings/encoding.rs +++ b/rust/parquet/src/encodings/encoding.rs @@ -17,7 +17,7 @@ //! Contains all supported encoders for Parquet. -use std::{cmp, io::Write, marker::PhantomData, mem, slice}; +use std::{cmp, io::Write, marker::PhantomData, mem}; use crate::basic::*; use crate::data_type::*; @@ -129,17 +129,6 @@ impl PlainEncoder { } impl Encoder for PlainEncoder { - default fn put(&mut self, values: &[T::T]) -> Result<()> { - let bytes = unsafe { - slice::from_raw_parts( - values as *const [T::T] as *const u8, - mem::size_of::() * values.len(), - ) - }; - self.buffer.write(bytes)?; - Ok(()) - } - fn encoding(&self) -> Encoding { Encoding::PLAIN } @@ -156,6 +145,21 @@ impl Encoder for PlainEncoder { Ok(self.buffer.consume()) } + + default fn put(&mut self, _values: &[T::T]) -> Result<()> { + unreachable!() + } +} + +impl Encoder for PlainEncoder +where + T::T: SliceAsBytes, +{ + default fn put(&mut self, values: &[T::T]) -> Result<()> { + let bytes = T::T::slice_as_bytes(values); + self.buffer.write(bytes)?; + Ok(()) + } } impl Encoder for PlainEncoder { From 46a894b5993021d6d304c3b2f05d756c894f8dc7 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Wed, 5 Feb 2020 20:20:24 +0100 Subject: [PATCH 14/20] refactor: Avoid unsafe when casting RecordReader --- rust/parquet/src/arrow/array_reader.rs | 139 +++++++----------------- rust/parquet/src/arrow/record_reader.rs | 20 ++++ 2 files changed, 61 insertions(+), 98 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 342e3642c92..5f57d0067a4 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -151,119 +151,62 @@ impl ArrayReader for PrimitiveArrayReader { // convert to arrays let array = match (&self.data_type, T::get_physical_type()) { - (ArrowType::Boolean, PhysicalType::BOOLEAN) => unsafe { - BoolConverter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Int8, PhysicalType::INT32) => unsafe { - Int8Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Int16, PhysicalType::INT32) => unsafe { - Int16Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Int32, PhysicalType::INT32) => unsafe { - Int32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::UInt8, PhysicalType::INT32) => unsafe { - UInt8Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::UInt16, PhysicalType::INT32) => unsafe { - UInt16Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::UInt32, PhysicalType::INT32) => unsafe { - UInt32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Int64, PhysicalType::INT64) => unsafe { - Int64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::UInt64, PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Float32, PhysicalType::FLOAT) => unsafe { - Float32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, - (ArrowType::Float64, PhysicalType::DOUBLE) => unsafe { - Float64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) - }, + (ArrowType::Boolean, PhysicalType::BOOLEAN) => { + BoolConverter::convert(self.record_reader.cast::()) + } + (ArrowType::Int8, PhysicalType::INT32) => { + Int8Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Int16, PhysicalType::INT32) => { + Int16Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Int32, PhysicalType::INT32) => { + Int32Converter::convert(self.record_reader.cast::()) + } + (ArrowType::UInt8, PhysicalType::INT32) => { + UInt8Converter::convert(self.record_reader.cast::()) + } + (ArrowType::UInt16, PhysicalType::INT32) => { + UInt16Converter::convert(self.record_reader.cast::()) + } + (ArrowType::UInt32, PhysicalType::INT32) => { + UInt32Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Int64, PhysicalType::INT64) => { + Int64Converter::convert(self.record_reader.cast::()) + } + (ArrowType::UInt64, PhysicalType::INT64) => { + UInt64Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Float32, PhysicalType::FLOAT) => { + Float32Converter::convert(self.record_reader.cast::()) + } + (ArrowType::Float64, PhysicalType::DOUBLE) => { + Float64Converter::convert(self.record_reader.cast::()) + } (ArrowType::Timestamp(_, _), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) + UInt64Converter::convert(self.record_reader.cast::()) }, (ArrowType::Date32(_), PhysicalType::INT32) => unsafe { - UInt32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) + UInt32Converter::convert(self.record_reader.cast::()) }, (ArrowType::Date64(_), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) + UInt64Converter::convert(self.record_reader.cast::()) }, (ArrowType::Time32(_), PhysicalType::INT32) => unsafe { - UInt32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) + UInt32Converter::convert(self.record_reader.cast::()) }, (ArrowType::Time64(_), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) + UInt64Converter::convert(self.record_reader.cast::()) }, (ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => unsafe { - UInt32Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) + UInt32Converter::convert(self.record_reader.cast::()) }, (ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) + UInt64Converter::convert(self.record_reader.cast::()) }, (ArrowType::Duration(_), PhysicalType::INT64) => unsafe { - UInt64Converter::convert(transmute::< - &mut RecordReader, - &mut RecordReader, - >(&mut self.record_reader)) + UInt64Converter::convert(self.record_reader.cast::()) }, (arrow_type, physical_type) => Err(general_err!( "Reading {:?} type from parquet {:?} is not supported yet.", diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 5c7a8b08eac..3bf00e5b11f 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -122,6 +122,26 @@ impl RecordReader { } } + pub(crate) fn cast(&mut self) -> &mut RecordReader { + trait CastRecordReader { + fn cast(&mut self) -> &mut RecordReader; + } + + impl CastRecordReader for RecordReader { + default fn cast(&mut self) -> &mut RecordReader { + panic!("Attempted to cast RecordReader to the wrong type") + } + } + + impl CastRecordReader for RecordReader { + fn cast(&mut self) -> &mut RecordReader { + self + } + } + + CastRecordReader::::cast(self) + } + /// Set the current page reader. pub fn set_page_reader(&mut self, page_reader: Box) -> Result<()> { self.column_reader = From 425ba07eb1c3436231ba9b3453c5c77acd4f9239 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Wed, 5 Feb 2020 20:33:06 +0100 Subject: [PATCH 15/20] refactor: Move unsafe into MutableBuffer --- rust/arrow/src/buffer.rs | 12 ++++++++++++ rust/parquet/src/arrow/array_reader.rs | 7 +------ rust/parquet/src/arrow/record_reader.rs | 1 + 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index df564f9583b..87481efbaa2 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -541,6 +541,18 @@ impl MutableBuffer { offset: 0, } } + + /// View buffer as typed slice. + pub fn typed_data_mut(&mut self) -> &mut [T] { + assert_eq!(self.len() % mem::size_of::(), 0); + assert!(memory::is_ptr_aligned::(self.raw_data() as *const T)); + unsafe { + from_raw_parts_mut( + self.raw_data() as *mut T, + self.len() / mem::size_of::(), + ) + } + } } impl Drop for MutableBuffer { diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 5f57d0067a4..fe165b90667 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -19,10 +19,8 @@ use std::cmp::{max, min}; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::mem::size_of; -use std::mem::transmute; use std::rc::Rc; use std::result::Result::Ok; -use std::slice::from_raw_parts_mut; use std::sync::Arc; use std::vec::Vec; @@ -505,10 +503,7 @@ impl ArrayReader for StructArrayReader { let mut def_level_data_buffer = MutableBuffer::new(buffer_size); def_level_data_buffer.resize(buffer_size)?; - let def_level_data = unsafe { - let ptr = transmute::<*const u8, *mut i16>(def_level_data_buffer.raw_data()); - from_raw_parts_mut(ptr, children_array_len) - }; + let def_level_data = def_level_data_buffer.typed_data_mut(); def_level_data .iter_mut() diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 3bf00e5b11f..2acc01ca33e 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -73,6 +73,7 @@ impl<'a, T> FatPtr<'a, T> { type_size: usize, ) -> Self { assert!(align_of::() <= memory::ALIGNMENT); + // TODO Prevent this from being called with non primitive types (like `Box`) unsafe { FatPtr::new(slice::from_raw_parts_mut( &mut *(buf.raw_data() as *mut T).add(offset), From c606440a53cacc73d088cb5f1309e289a2b8420a Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Mon, 10 Feb 2020 11:41:53 +0100 Subject: [PATCH 16/20] fix: Don't create a slice from a null pointer when calling Buffer::data --- rust/arrow/src/buffer.rs | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 87481efbaa2..beea089ddce 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -68,14 +68,11 @@ struct BufferData { impl PartialEq for BufferData { fn eq(&self, other: &BufferData) -> bool { - if self.len != other.len { - return false; - } if self.capacity != other.capacity { return false; } - unsafe { memory::memcmp(self.ptr, other.ptr, self.len) == 0 } + self.data() == other.data() } } @@ -106,6 +103,16 @@ impl Debug for BufferData { } } +impl BufferData { + fn data(&self) -> &[u8] { + if self.ptr.is_null() { + &[] + } else { + unsafe { std::slice::from_raw_parts(self.ptr, self.len) } + } + } +} + impl Buffer { /// Creates a buffer from an existing memory region (must already be byte-aligned), this /// `Buffer` will free this piece of memory when dropped. @@ -194,7 +201,7 @@ impl Buffer { /// Returns the byte slice stored in this buffer pub fn data(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) } + self.data.data() } /// Returns a slice of this buffer, starting from `offset`. @@ -511,12 +518,22 @@ impl MutableBuffer { /// Returns the data stored in this buffer as a slice. pub fn data(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) } + if self.data.is_null() { + &[] + } else { + unsafe { std::slice::from_raw_parts(self.raw_data(), self.len()) } + } } /// Returns the data stored in this buffer as a mutable slice. pub fn data_mut(&mut self) -> &mut [u8] { - unsafe { std::slice::from_raw_parts_mut(self.raw_data() as *mut u8, self.len()) } + if self.data.is_null() { + &mut [] + } else { + unsafe { + std::slice::from_raw_parts_mut(self.raw_data() as *mut u8, self.len()) + } + } } /// Returns a raw pointer for this buffer. From 52cc391a1ee86f1923ccc07715ed6c8b58a00db5 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Mon, 10 Feb 2020 12:03:56 +0100 Subject: [PATCH 17/20] refactor: No need for unsafe in Debug impl for Buffer --- rust/arrow/src/buffer.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index beea089ddce..f3484de3d74 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -93,11 +93,7 @@ impl Debug for BufferData { self.ptr, self.len, self.capacity )?; - unsafe { - f.debug_list() - .entries(std::slice::from_raw_parts(self.ptr, self.len).iter()) - .finish()?; - } + f.debug_list().entries(self.data().iter()).finish()?; write!(f, " }}") } From 6990951bfdc996a17d109ed68083f884008eb088 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Thu, 13 Feb 2020 15:13:54 +0100 Subject: [PATCH 18/20] Fix fn data --- rust/arrow/src/buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index f3484de3d74..154e18c9985 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -197,7 +197,7 @@ impl Buffer { /// Returns the byte slice stored in this buffer pub fn data(&self) -> &[u8] { - self.data.data() + &self.data.data()[self.offset..] } /// Returns a slice of this buffer, starting from `offset`. From cfb8496a3ab09af8780866f0986dd8f7ddbd92bf Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Thu, 13 Feb 2020 15:25:22 +0100 Subject: [PATCH 19/20] Fix non-simd compilation --- rust/arrow/src/buffer.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 154e18c9985..bc0bf290981 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -27,9 +27,7 @@ use std::fmt::{Debug, Formatter}; use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write}; use std::mem; use std::ops::{BitAnd, BitOr, Not}; -use std::slice::from_raw_parts; -#[cfg(feature = "simd")] -use std::slice::from_raw_parts_mut; +use std::slice::{from_raw_parts, from_raw_parts_mut}; use std::sync::Arc; use crate::array::{BufferBuilderTrait, UInt8BufferBuilder}; From 45cccc4693cd3b1ff439b79d2ae4c214c4ec8c16 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Tue, 14 Apr 2020 09:51:54 +0200 Subject: [PATCH 20/20] Fix bluss's review comments --- rust/arrow/src/buffer.rs | 11 +++++---- rust/parquet/src/arrow/array_reader.rs | 32 +++++++++++++------------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index bc0bf290981..ba85c611f8b 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -201,7 +201,7 @@ impl Buffer { /// Returns a slice of this buffer, starting from `offset`. pub fn slice(&self, offset: usize) -> Self { assert!( - self.offset + offset <= self.len(), + offset <= self.len(), "the offset of the new Buffer cannot exceed the existing length" ); Self { @@ -524,9 +524,7 @@ impl MutableBuffer { if self.data.is_null() { &mut [] } else { - unsafe { - std::slice::from_raw_parts_mut(self.raw_data() as *mut u8, self.len()) - } + unsafe { std::slice::from_raw_parts_mut(self.raw_data_mut(), self.len()) } } } @@ -538,6 +536,10 @@ impl MutableBuffer { self.data } + pub fn raw_data_mut(&mut self) -> *mut u8 { + self.data + } + /// Freezes this buffer and return an immutable version of it. pub fn freeze(self) -> Buffer { let buffer_data = BufferData { @@ -688,6 +690,7 @@ mod tests { assert_eq!(empty_slice, buf4.data()); assert_eq!(0, buf4.len()); assert!(buf4.is_empty()); + assert_eq!(buf2.slice(2).data(), &[10]); } #[test] diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index fe165b90667..137b9ab6a3a 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -182,30 +182,30 @@ impl ArrayReader for PrimitiveArrayReader { (ArrowType::Float64, PhysicalType::DOUBLE) => { Float64Converter::convert(self.record_reader.cast::()) } - (ArrowType::Timestamp(_, _), PhysicalType::INT64) => unsafe { + (ArrowType::Timestamp(_, _), PhysicalType::INT64) => { UInt64Converter::convert(self.record_reader.cast::()) - }, - (ArrowType::Date32(_), PhysicalType::INT32) => unsafe { + } + (ArrowType::Date32(_), PhysicalType::INT32) => { UInt32Converter::convert(self.record_reader.cast::()) - }, - (ArrowType::Date64(_), PhysicalType::INT64) => unsafe { + } + (ArrowType::Date64(_), PhysicalType::INT64) => { UInt64Converter::convert(self.record_reader.cast::()) - }, - (ArrowType::Time32(_), PhysicalType::INT32) => unsafe { + } + (ArrowType::Time32(_), PhysicalType::INT32) => { UInt32Converter::convert(self.record_reader.cast::()) - }, - (ArrowType::Time64(_), PhysicalType::INT64) => unsafe { + } + (ArrowType::Time64(_), PhysicalType::INT64) => { UInt64Converter::convert(self.record_reader.cast::()) - }, - (ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => unsafe { + } + (ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => { UInt32Converter::convert(self.record_reader.cast::()) - }, - (ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => unsafe { + } + (ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => { UInt64Converter::convert(self.record_reader.cast::()) - }, - (ArrowType::Duration(_), PhysicalType::INT64) => unsafe { + } + (ArrowType::Duration(_), PhysicalType::INT64) => { UInt64Converter::convert(self.record_reader.cast::()) - }, + } (arrow_type, physical_type) => Err(general_err!( "Reading {:?} type from parquet {:?} is not supported yet.", arrow_type,