From a0c4de4340591a4e05487b82e52364d56b77d028 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Wed, 4 Nov 2020 23:05:46 +0100 Subject: [PATCH 1/4] ARROW-10500: [Rust]: Refactor bit slice iterators --- rust/arrow/Cargo.toml | 1 + rust/arrow/src/buffer.rs | 64 ++-- rust/arrow/src/compute/kernels/aggregate.rs | 57 +-- rust/arrow/src/compute/kernels/boolean.rs | 2 +- rust/arrow/src/compute/util.rs | 8 +- rust/arrow/src/util/bit_chunk_iterator.rs | 257 -------------- rust/arrow/src/util/bit_slice_iterator.rs | 368 ++++++++++++++++++++ rust/arrow/src/util/mod.rs | 2 +- rust/datafusion/tests/sql.rs | 34 +- 9 files changed, 474 insertions(+), 319 deletions(-) delete mode 100644 rust/arrow/src/util/bit_chunk_iterator.rs create mode 100644 rust/arrow/src/util/bit_slice_iterator.rs diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 71445768207..86f1fac627d 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -48,6 +48,7 @@ lazy_static = "1.4" packed_simd = { version = "0.3.4", optional = true, package = "packed_simd_2" } chrono = "0.4" flatbuffers = "0.6" +bitvec = "0.19" hex = "0.4" prettytable-rs = { version = "0.8.0", optional = true } diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index d5b824e2992..08fb582d35c 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -33,9 +33,10 @@ use std::sync::Arc; use crate::datatypes::ArrowNativeType; use crate::error::{ArrowError, Result}; use crate::memory; -use crate::util::bit_chunk_iterator::BitChunks; +use crate::util::bit_slice_iterator::*; use crate::util::bit_util; use crate::util::bit_util::ceil; + #[cfg(feature = "simd")] use std::borrow::BorrowMut; @@ -258,16 +259,20 @@ impl Buffer { /// Returns a slice of this buffer starting at a certain bit offset. /// If the offset is byte-aligned the returned buffer is a shallow clone, /// otherwise a new buffer is allocated and filled with a copy of the bits in the range. - pub fn bit_slice(&self, offset: usize, len: usize) -> Self { - if offset % 8 == 0 && len % 8 == 0 { - return self.slice(offset / 8); + pub fn bit_view(&self, offset_in_bits: usize, len_in_bits: usize) -> Self { + if offset_in_bits % 8 == 0 && len_in_bits % 8 == 0 { + self.slice(offset_in_bits / 8) + } else { + self.bit_slice() + .view(offset_in_bits, len_in_bits) + .as_buffer() } - - bitwise_unary_op_helper(&self, offset, len, |a| a) } - pub fn bit_chunks(&self, offset: usize, len: usize) -> BitChunks { - BitChunks::new(&self, offset, len) + /// Gives bit slice of the underlying buffer + /// This method can be used to get bit views for bit operations on the immutable view over the buffer. + pub fn bit_slice(&self) -> BufferBitSlice { + BufferBitSlice::new(self.data.data()) } /// Returns an empty buffer. @@ -401,20 +406,27 @@ where let mut result = MutableBuffer::new(ceil(len_in_bits, 8)).with_bitset(len_in_bits / 64 * 8, false); - let left_chunks = left.bit_chunks(left_offset_in_bits, len_in_bits); - let right_chunks = right.bit_chunks(right_offset_in_bits, len_in_bits); + let left_slice = left.bit_slice().view(left_offset_in_bits, len_in_bits); + let left_chunks = left_slice.chunks::(); + + let right_slice = right.bit_slice().view(right_offset_in_bits, len_in_bits); + let right_chunks = right_slice.chunks::(); + + let remainder_bytes = ceil(left_chunks.remainder_bit_len(), 8); + let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits()); + let rem = &rem.to_ne_bytes()[0..remainder_bytes]; + + let left_chunk_iter = left_chunks.interpret(); + let right_chunk_iter = right_chunks.interpret(); + let result_chunks = result.typed_data_mut::().iter_mut(); result_chunks - .zip(left_chunks.iter().zip(right_chunks.iter())) + .zip(left_chunk_iter.zip(right_chunk_iter)) .for_each(|(res, (left, right))| { *res = op(left, right); }); - let remainder_bytes = ceil(left_chunks.remainder_len(), 8); - let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits()); - // we are counting its starting from the least significant bit, to to_le_bytes should be correct - let rem = &rem.to_le_bytes()[0..remainder_bytes]; result.extend_from_slice(rem); result.freeze() @@ -435,19 +447,21 @@ where let mut result = MutableBuffer::new(ceil(len_in_bits, 8)).with_bitset(len_in_bits / 64 * 8, false); - let left_chunks = left.bit_chunks(offset_in_bits, len_in_bits); + let left_slice = left.bit_slice().view(offset_in_bits, len_in_bits); + let left_chunks = left_slice.chunks::(); + + let remainder_bytes = ceil(left_chunks.remainder_bit_len(), 8); + let rem = op(left_chunks.remainder_bits()); + let rem = &rem.to_ne_bytes()[0..remainder_bytes]; + + let left_chunk_iter = left_chunks.interpret(); + let result_chunks = result.typed_data_mut::().iter_mut(); - result_chunks - .zip(left_chunks.iter()) - .for_each(|(res, left)| { - *res = op(left); - }); + result_chunks.zip(left_chunk_iter).for_each(|(res, left)| { + *res = op(left); + }); - let remainder_bytes = ceil(left_chunks.remainder_len(), 8); - let rem = op(left_chunks.remainder_bits()); - // we are counting its starting from the least significant bit, to to_le_bytes should be correct - let rem = &rem.to_le_bytes()[0..remainder_bytes]; result.extend_from_slice(rem); result.freeze() diff --git a/rust/arrow/src/compute/kernels/aggregate.rs b/rust/arrow/src/compute/kernels/aggregate.rs index 444e2454a1c..51ac11320c0 100644 --- a/rust/arrow/src/compute/kernels/aggregate.rs +++ b/rust/arrow/src/compute/kernels/aggregate.rs @@ -152,9 +152,21 @@ where let data_chunks = data.chunks_exact(64); let remainder = data_chunks.remainder(); - let bit_chunks = buffer.bit_chunks(array.offset(), array.len()); + let buffer_slice = buffer.bit_slice().view(array.offset(), array.len()); + let buffer_chunks = buffer_slice.chunks::(); + + let buffer_remainder_bits: u64 = buffer_chunks.remainder_bits(); + + let buffer_chunk_iter = buffer_chunks.interpret(); + + remainder.iter().enumerate().for_each(|(i, value)| { + if buffer_remainder_bits & (1 << i) != 0 { + sum = sum + *value; + } + }); + &data_chunks - .zip(bit_chunks.iter()) + .zip(buffer_chunk_iter) .for_each(|(chunk, mask)| { chunk.iter().enumerate().for_each(|(i, value)| { if (mask & (1 << i)) != 0 { @@ -163,14 +175,6 @@ where }); }); - let remainder_bits = bit_chunks.remainder_bits(); - - remainder.iter().enumerate().for_each(|(i, value)| { - if remainder_bits & (1 << i) != 0 { - sum = sum + *value; - } - }); - Some(sum) } } @@ -216,24 +220,27 @@ where let data_chunks = data.chunks_exact(64); let remainder = data_chunks.remainder(); - let bit_chunks = buffer.bit_chunks(array.offset(), array.len()); + let bit_slice = buffer.bit_slice().view(array.offset(), array.len()); + let bit_chunks = bit_slice.chunks::(); let remainder_bits = bit_chunks.remainder_bits(); - data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { - // split chunks further into slices corresponding to the vector length - // the compiler is able to unroll this inner loop and remove bounds checks - // since the outer chunk size (64) is always a multiple of the number of lanes - chunk.chunks_exact(T::lanes()).for_each(|chunk| { - let zero = T::init(T::default_value()); - let vecmask = T::mask_from_u64(mask); - let chunk = T::load(&chunk); - let blended = T::mask_select(vecmask, chunk, zero); - - vector_sum = vector_sum + blended; - - mask = mask >> T::lanes(); + data_chunks + .zip(bit_chunks.interpret()) + .for_each(|(chunk, mut mask)| { + // split chunks further into slices corresponding to the vector length + // the compiler is able to unroll this inner loop and remove bounds checks + // since the outer chunk size (64) is always a multiple of the number of lanes + chunk.chunks_exact(T::lanes()).for_each(|chunk| { + let zero = T::init(T::default_value()); + let vecmask = T::mask_from_u64(mask); + let chunk = T::load(&chunk); + let blended = T::mask_select(vecmask, chunk, zero); + + vector_sum = vector_sum + blended; + + mask = mask >> T::lanes(); + }); }); - }); remainder.iter().enumerate().for_each(|(i, value)| { if remainder_bits & (1 << i) != 0 { diff --git a/rust/arrow/src/compute/kernels/boolean.rs b/rust/arrow/src/compute/kernels/boolean.rs index 075ffb0d67b..6a4bff5e3ab 100644 --- a/rust/arrow/src/compute/kernels/boolean.rs +++ b/rust/arrow/src/compute/kernels/boolean.rs @@ -140,7 +140,7 @@ pub fn is_not_null(input: &ArrayRef) -> Result { .with_bitset(len_bytes, true) .freeze() } - Some(buffer) => buffer.bit_slice(input.offset(), len), + Some(buffer) => buffer.bit_view(input.offset(), len), }; let data = diff --git a/rust/arrow/src/compute/util.rs b/rust/arrow/src/compute/util.rs index 3b578011640..dde50e4511c 100644 --- a/rust/arrow/src/compute/util.rs +++ b/rust/arrow/src/compute/util.rs @@ -45,10 +45,10 @@ pub(super) fn combine_option_bitmap( match left { None => match right { None => Ok(None), - Some(r) => Ok(Some(r.bit_slice(right_offset_in_bits, len_in_bits))), + Some(r) => Ok(Some(r.bit_view(right_offset_in_bits, len_in_bits))), }, Some(l) => match right { - None => Ok(Some(l.bit_slice(left_offset_in_bits, len_in_bits))), + None => Ok(Some(l.bit_view(left_offset_in_bits, len_in_bits))), Some(r) => Ok(Some(buffer_bin_and( &l, @@ -78,10 +78,10 @@ pub(super) fn compare_option_bitmap( match left { None => match right { None => Ok(None), - Some(r) => Ok(Some(r.bit_slice(right_offset_in_bits, len_in_bits))), + Some(r) => Ok(Some(r.bit_view(right_offset_in_bits, len_in_bits))), }, Some(l) => match right { - None => Ok(Some(l.bit_slice(left_offset_in_bits, len_in_bits))), + None => Ok(Some(l.bit_view(left_offset_in_bits, len_in_bits))), Some(r) => Ok(Some(buffer_bin_or( &l, diff --git a/rust/arrow/src/util/bit_chunk_iterator.rs b/rust/arrow/src/util/bit_chunk_iterator.rs deleted file mode 100644 index ec10c6f1d41..00000000000 --- a/rust/arrow/src/util/bit_chunk_iterator.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -use crate::buffer::Buffer; -use crate::util::bit_util::ceil; -use std::fmt::Debug; - -#[derive(Debug)] -pub struct BitChunks<'a> { - buffer: &'a Buffer, - raw_data: *const u8, - /// offset inside a byte, guaranteed to be between 0 and 7 (inclusive) - bit_offset: usize, - /// number of complete u64 chunks - chunk_len: usize, - /// number of remaining bits, guaranteed to be between 0 and 63 (inclusive) - remainder_len: usize, -} - -impl<'a> BitChunks<'a> { - pub fn new(buffer: &'a Buffer, offset: usize, len: usize) -> Self { - assert!(ceil(offset + len, 8) <= buffer.len() * 8); - - let byte_offset = offset / 8; - let bit_offset = offset % 8; - - let raw_data = unsafe { buffer.raw_data().add(byte_offset) }; - - let chunk_bits = 8 * std::mem::size_of::(); - - let chunk_len = len / chunk_bits; - let remainder_len = len & (chunk_bits - 1); - - BitChunks::<'a> { - buffer: &buffer, - raw_data, - bit_offset, - chunk_len, - remainder_len, - } - } -} - -#[derive(Debug)] -pub struct BitChunkIterator<'a> { - buffer: &'a Buffer, - raw_data: *const u8, - bit_offset: usize, - chunk_len: usize, - index: usize, -} - -impl<'a> BitChunks<'a> { - /// Returns the number of remaining bits, guaranteed to be between 0 and 63 (inclusive) - #[inline] - pub const fn remainder_len(&self) -> usize { - self.remainder_len - } - - /// Returns the bitmask of remaining bits - #[inline] - pub fn remainder_bits(&self) -> u64 { - let bit_len = self.remainder_len; - if bit_len == 0 { - 0 - } else { - let bit_offset = self.bit_offset; - // number of bytes to read - // might be one more than sizeof(u64) if the offset is in the middle of a byte - let byte_len = ceil(bit_len + bit_offset, 8); - // pointer to remainder bytes after all complete chunks - let base = unsafe { - self.raw_data - .add(self.chunk_len * std::mem::size_of::()) - }; - - let mut bits = unsafe { std::ptr::read(base) } as u64 >> bit_offset; - for i in 1..byte_len { - let byte = unsafe { std::ptr::read(base.add(i)) }; - bits |= (byte as u64) << (i * 8 - bit_offset); - } - - bits & ((1 << bit_len) - 1) - } - } - - /// Returns an iterator over chunks of 64 bits represented as an u64 - #[inline] - pub const fn iter(&self) -> BitChunkIterator<'a> { - BitChunkIterator::<'a> { - buffer: self.buffer, - raw_data: self.raw_data, - bit_offset: self.bit_offset, - chunk_len: self.chunk_len, - index: 0, - } - } -} - -impl<'a> IntoIterator for BitChunks<'a> { - type Item = u64; - type IntoIter = BitChunkIterator<'a>; - - fn into_iter(self) -> Self::IntoIter { - self.iter() - } -} - -impl Iterator for BitChunkIterator<'_> { - type Item = u64; - - #[inline] - fn next(&mut self) -> Option { - let index = self.index; - if index >= self.chunk_len { - return None; - } - - // cast to *const u64 should be fine since we are using read_unaligned below - #[allow(clippy::cast_ptr_alignment)] - let raw_data = self.raw_data as *const u64; - - // bit-packed buffers are stored starting with the least-significant byte first - // so when reading as u64 on a big-endian machine, the bytes need to be swapped - let current = unsafe { std::ptr::read_unaligned(raw_data.add(index)).to_le() }; - - let combined = if self.bit_offset == 0 { - current - } else { - let next = - unsafe { std::ptr::read_unaligned(raw_data.add(index + 1)).to_le() }; - - current >> self.bit_offset - | (next & ((1 << self.bit_offset) - 1)) << (64 - self.bit_offset) - }; - - self.index = index + 1; - - Some(combined) - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - ( - self.chunk_len - self.index, - Some(self.chunk_len - self.index), - ) - } -} - -impl ExactSizeIterator for BitChunkIterator<'_> { - #[inline] - fn len(&self) -> usize { - self.chunk_len - self.index - } -} - -#[cfg(test)] -mod tests { - use crate::buffer::Buffer; - - #[test] - fn test_iter_aligned() { - let input: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7]; - let buffer: Buffer = Buffer::from(input); - - let bitchunks = buffer.bit_chunks(0, 64); - let result = bitchunks.into_iter().collect::>(); - - assert_eq!(vec![0x0706050403020100], result); - } - - #[test] - fn test_iter_unaligned() { - let input: &[u8] = &[ - 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, - 0b00100000, 0b01000000, 0b11111111, - ]; - let buffer: Buffer = Buffer::from(input); - - let bitchunks = buffer.bit_chunks(4, 64); - - assert_eq!(0, bitchunks.remainder_len()); - assert_eq!(0, bitchunks.remainder_bits()); - - let result = bitchunks.into_iter().collect::>(); - - assert_eq!( - vec![0b1111010000000010000000010000000010000000010000000010000000010000], - result - ); - } - - #[test] - fn test_iter_unaligned_remainder_1_byte() { - let input: &[u8] = &[ - 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, - 0b00100000, 0b01000000, 0b11111111, - ]; - let buffer: Buffer = Buffer::from(input); - - let bitchunks = buffer.bit_chunks(4, 66); - - assert_eq!(2, bitchunks.remainder_len()); - assert_eq!(0b00000011, bitchunks.remainder_bits()); - - let result = bitchunks.into_iter().collect::>(); - - assert_eq!( - vec![0b1111010000000010000000010000000010000000010000000010000000010000], - result - ); - } - - #[test] - fn test_iter_unaligned_remainder_bits_across_bytes() { - let input: &[u8] = &[0b00111111, 0b11111100]; - let buffer: Buffer = Buffer::from(input); - - // remainder contains bits from both bytes - // result should be the highest 2 bits from first byte followed by lowest 5 bits of second bytes - let bitchunks = buffer.bit_chunks(6, 7); - - assert_eq!(7, bitchunks.remainder_len()); - assert_eq!(0b1110000, bitchunks.remainder_bits()); - } - - #[test] - fn test_iter_unaligned_remainder_bits_large() { - let input: &[u8] = &[ - 0b11111111, 0b00000000, 0b11111111, 0b00000000, 0b11111111, 0b00000000, - 0b11111111, 0b00000000, 0b11111111, - ]; - let buffer: Buffer = Buffer::from(input); - - let bitchunks = buffer.bit_chunks(2, 63); - - assert_eq!(63, bitchunks.remainder_len()); - assert_eq!( - 0b1000000_00111111_11000000_00111111_11000000_00111111_11000000_00111111, - bitchunks.remainder_bits() - ); - } -} diff --git a/rust/arrow/src/util/bit_slice_iterator.rs b/rust/arrow/src/util/bit_slice_iterator.rs new file mode 100644 index 00000000000..0cd09ea4afc --- /dev/null +++ b/rust/arrow/src/util/bit_slice_iterator.rs @@ -0,0 +1,368 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::buffer::Buffer; + +use bitvec::prelude::*; +use bitvec::slice::ChunksExact; + +use std::fmt::Debug; + +/// +/// Bit slice representation of buffer data +#[derive(Debug)] +pub struct BufferBitSlice<'a> { + buffer_data: &'a [u8], + bit_slice: &'a BitSlice, +} + +impl<'a> BufferBitSlice<'a> { + /// + /// Creates a bit slice over the given data + #[inline] + pub fn new(buffer_data: &'a [u8]) -> Self { + let bit_slice = BitSlice::::from_slice(buffer_data).unwrap(); + + BufferBitSlice { + buffer_data, + bit_slice, + } + } + + /// + /// Returns immutable view with the given offset in bits and length in bits. + /// This view have zero-copy representation over the actual data. + #[inline] + pub fn view(&self, offset_in_bits: usize, len_in_bits: usize) -> Self { + Self { + buffer_data: self.buffer_data, + bit_slice: &self.bit_slice[offset_in_bits..offset_in_bits + len_in_bits], + } + } + + /// + /// Returns bit chunks in native 64-bit allocation size. + /// Native representations in Arrow follows 64-bit convention. + /// Chunks can still be reinterpreted in any primitive type lower than u64. + #[inline] + pub fn chunks(&self) -> BufferBitChunksExact + where + T: BitMemory, + { + let offset_size_in_bits = 8 * std::mem::size_of::(); + let chunks_exact = self.bit_slice.chunks_exact(offset_size_in_bits); + let remainder_bits = chunks_exact.remainder(); + let remainder: T = if remainder_bits.len() == 0 { + T::default() + } else { + remainder_bits.load::() + }; + BufferBitChunksExact { + chunks_exact, + remainder, + remainder_len_in_bits: remainder_bits.len(), + } + } + + /// + /// Converts the bit view into the Buffer. + /// Buffer is always byte-aligned and well-aligned. + #[inline] + pub fn as_buffer(&self) -> Buffer { + Buffer::from(self.bit_slice.as_slice()) + } +} + +/// +/// Exact chunk view over the bit slice +#[derive(Clone, Debug)] +pub struct BufferBitChunksExact<'a, T> +where + T: BitMemory, +{ + chunks_exact: ChunksExact<'a, LocalBits, u8>, + remainder: T, + remainder_len_in_bits: usize, +} + +impl<'a, T> BufferBitChunksExact<'a, T> +where + T: BitMemory, +{ + /// + /// Returns remainder bit length from the exact chunk iterator + #[inline] + pub fn remainder_bit_len(&self) -> usize { + self.remainder_len_in_bits + } + + /// + /// Returns the remainder bits interpreted as given type. + #[inline] + pub fn remainder_bits(&self) -> T { + self.remainder + } + + /// + /// Interprets underlying chunk's view's bits as a given type. + #[inline] + pub fn interpret(self) -> impl Iterator + 'a + where + T: BitMemory, + { + self.chunks_exact.map(|e| e.load::()) + } + + /// + /// Returns underlying iterator as it is + #[inline] + pub fn iter(&self) -> &ChunksExact<'a, LocalBits, u8> { + &self.chunks_exact + } +} + +/// +/// Implements consuming iterator for exact chunk iterator +impl<'a, T> IntoIterator for BufferBitChunksExact<'a, T> +where + T: BitMemory, +{ + type Item = &'a BitSlice; + type IntoIter = ChunksExact<'a, LocalBits, u8>; + + fn into_iter(self) -> Self::IntoIter { + self.chunks_exact + } +} + +#[cfg(all(test, target_endian = "little"))] +mod tests_bit_slices_little_endian { + use super::*; + use crate::datatypes::ToByteSlice; + + #[test] + fn test_bit_slice_iter_aligned() { + let input: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice(); + let result = bit_slice.chunks().interpret().collect::>(); + + assert_eq!(vec![0x0706050403020100], result); + } + + #[test] + fn test_bit_slice_iter_unaligned() { + let input: &[u8] = &[ + 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, + 0b00100000, 0b01000000, 0b11111111, + ]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice().view(4, 64); + let chunks = bit_slice.chunks::(); + + assert_eq!(0, chunks.remainder_bit_len()); + assert_eq!(0, chunks.remainder_bits()); + + let result = chunks.interpret().collect::>(); + + assert_eq!( + vec![0b1111_01000000_00100000_00010000_00001000_00000100_00000010_00000001_0000], + result + ); + } + + #[test] + fn test_bit_slice_iter_unaligned_remainder_1_byte() { + let input: &[u8] = &[ + 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, + 0b00100000, 0b01000000, 0b11111111, + ]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice().view(4, 66); + let chunks = bit_slice.chunks::(); + + assert_eq!(2, chunks.remainder_bit_len()); + assert_eq!(0b00000011, chunks.remainder_bits()); + + let result = chunks.interpret().collect::>(); + + assert_eq!( + vec![0b1111_01000000_00100000_00010000_00001000_00000100_00000010_00000001_0000], + result + ); + } + + #[test] + fn test_bit_slice_iter_unaligned_remainder_bits_across_bytes() { + let input: &[u8] = &[0b00111111, 0b11111100]; + let buffer: Buffer = Buffer::from(input); + + // remainder contains bits from both bytes + // result should be the highest 2 bits from first byte followed by lowest 5 bits of second bytes + let bit_slice = buffer.bit_slice().view(6, 7); + let chunks = bit_slice.chunks::(); + + assert_eq!(7, chunks.remainder_bit_len()); + assert_eq!(0b1110000, chunks.remainder_bits()); + } + + #[test] + fn test_bit_slice_iter_unaligned_remainder_bits_large() { + let input: &[u8] = &[ + 0b11111111, 0b00000000, 0b11111111, 0b00000000, 0b11111111, 0b00000000, + 0b11111111, 0b00000000, 0b11111111, + ]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice().view(2, 63); + let chunks = bit_slice.chunks::(); + + assert_eq!(63, chunks.remainder_bit_len()); + assert_eq!( + 0b1000000_00111111_11000000_00111111_11000000_00111111_11000000_00111111, + chunks.remainder_bits() + ); + } + + #[test] + fn test_bit_slice_iter_reinterpret() { + assert_eq!(LocalBits::default(), Lsb0::default()); + let buffer_slice = &[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice(); + // Name of the bit slice comes from byte slice, since it is still on the stack and behaves similarly to Rust's byte slice. + let buffer = Buffer::from(buffer_slice); + + // Let's get the whole buffer. + let bit_slice = buffer.bit_slice().view(0, buffer_slice.len() * 8); + // Let's also get a chunked bits as u8, not u64 this time... + let chunks = bit_slice.chunks::(); + + let result = chunks.interpret().collect::>(); + assert_eq!(buffer_slice.to_vec(), result); + } +} + +#[cfg(all(test, target_endian = "big"))] +mod tests_bit_slices_big_endian { + use super::*; + use crate::datatypes::ToByteSlice; + + #[test] + fn test_bit_slice_iter_aligned() { + let input: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice(); + let result = bit_slice.chunks().interpret().collect::>(); + + assert_eq!(vec![0x0001020304050607], result); + } + + #[test] + fn test_bit_slice_iter_unaligned() { + let input: &[u8] = &[ + 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, + 0b00100000, 0b01000000, 0b11111111, + ]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice().view(4, 64); + let chunks = bit_slice.chunks::(); + + assert_eq!(0, chunks.remainder_bit_len()); + assert_eq!(0, chunks.remainder_bits()); + + let result = chunks.interpret().collect::>(); + + assert_eq!( + vec![0b0001_00000010_00000100_00001000_00010000_00100000_01000000_1111], + result + ); + } + + #[test] + fn test_bit_slice_iter_unaligned_remainder_1_byte() { + let input: &[u8] = &[ + 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, + 0b00100000, 0b01000000, 0b11111111, + ]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice().view(4, 66); + let chunks = bit_slice.chunks::(); + + assert_eq!(2, chunks.remainder_bit_len()); + assert_eq!(0b00000011, chunks.remainder_bits()); + + let result = chunks.interpret().collect::>(); + + assert_eq!( + vec![0b0001_00000010_00000100_00001000_00010000_00100000_01000000_1111], + result + ); + } + + #[test] + fn test_bit_slice_iter_unaligned_remainder_bits_across_bytes() { + let input: &[u8] = &[0b00111111, 0b11111100]; + let buffer: Buffer = Buffer::from(input); + + // remainder contains bits from both bytes + // result should be the highest 2 bits from first byte followed by lowest 5 bits of second bytes + let bit_slice = buffer.bit_slice().view(6, 7); + let chunks = bit_slice.chunks::(); + + assert_eq!(7, chunks.remainder_bit_len()); + assert_eq!(0b01111111, chunks.remainder_bits()); + } + + #[test] + fn test_bit_slice_iter_unaligned_remainder_bits_large() { + let input: &[u8] = &[ + 0b11111111, 0b00000000, 0b11111111, 0b00000000, 0b11111111, 0b00000000, + 0b11111111, 0b00000000, 0b11111111, + ]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice().view(2, 63); + let chunks = bit_slice.chunks::(); + + assert_eq!(63, chunks.remainder_bit_len()); + assert_eq!( + 0b1111110_00000001_11111110_00000001_11111110_00000001_11111110_00000001, + chunks.remainder_bits() + ); + } + + #[test] + fn test_bit_slice_iter_reinterpret() { + assert_eq!(LocalBits::default(), Msb0::default()); + let buffer_slice = &[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice(); + // Name of the bit slice comes from byte slice, since it is still on the stack and behaves similarly to Rust's byte slice. + let buffer = Buffer::from(buffer_slice); + + // Let's get the whole buffer. + let bit_slice = buffer.bit_slice().view(0, buffer_slice.len() * 8); + // Let's also get a chunked bits as u8, not u64 this time... + let chunks = bit_slice.chunks::(); + + let result = chunks.interpret().collect::>(); + assert_eq!(buffer_slice.to_vec(), result); + } +} diff --git a/rust/arrow/src/util/mod.rs b/rust/arrow/src/util/mod.rs index 053d1329631..c440745fc93 100644 --- a/rust/arrow/src/util/mod.rs +++ b/rust/arrow/src/util/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -pub mod bit_chunk_iterator; +pub mod bit_slice_iterator; pub mod bit_util; pub mod buffered_iterator; pub mod display; diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 8dd7afa1d7e..c6ad1b381eb 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -349,7 +349,7 @@ async fn csv_query_avg_sqrt() -> Result<()> { let mut actual = execute(&mut ctx, sql).await; actual.sort(); let expected = vec![vec!["0.6706002946036462"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); Ok(()) } @@ -363,7 +363,7 @@ async fn csv_query_custom_udf_with_cast() -> Result<()> { let sql = "SELECT avg(custom_sqrt(c11)) FROM aggregate_test_100"; let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["0.6584408483418833"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); Ok(()) } @@ -377,11 +377,11 @@ async fn sqrt_f32_vs_f64() -> Result<()> { let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["0.6584408485889435"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); let sql = "SELECT avg(sqrt(CAST(c11 AS double))) FROM aggregate_test_100"; let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["0.6584408483418833"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); Ok(()) } @@ -405,7 +405,7 @@ async fn csv_query_sqrt_sqrt() -> Result<()> { let actual = execute(&mut ctx, sql).await; // sqrt(sqrt(c12=0.9294097332465232)) = 0.9818650561397431 let expected = vec![vec!["0.9818650561397431"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); Ok(()) } @@ -448,7 +448,7 @@ async fn csv_query_avg() -> Result<()> { let mut actual = execute(&mut ctx, sql).await; actual.sort(); let expected = vec![vec!["0.5089725099127211"]]; - assert_eq!(expected, actual); + assert_float_eq(&expected, &actual); Ok(()) } @@ -1399,3 +1399,25 @@ async fn query_on_string_dictionary() -> Result<()> { Ok(()) } + +fn assert_float_eq(expected: &Vec>, received: &Vec>) +where + T: AsRef, +{ + expected + .into_iter() + .flatten() + .zip(received.into_iter().flatten()) + .for_each(|(l, r)| { + let (l, r) = ( + l.as_ref().parse::().unwrap(), + r.as_str().parse::().unwrap(), + ); + // Too small floats are hard to approximate, give them some more error range. + if l < 1.0 && r < 1.0 { + assert!((l - r).abs() <= 2.0_f64 * f64::EPSILON); + } else { + assert!((l - r).abs() <= f64::EPSILON); + } + }); +} From 591aa6b6a2bd469c2a5b4e030556d63c5e73277f Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Tue, 10 Nov 2020 23:57:43 +0100 Subject: [PATCH 2/4] Move big endian tests to #8634 (ARROW-10535) --- rust/arrow/src/util/bit_slice_iterator.rs | 109 ---------------------- 1 file changed, 109 deletions(-) diff --git a/rust/arrow/src/util/bit_slice_iterator.rs b/rust/arrow/src/util/bit_slice_iterator.rs index 0cd09ea4afc..e2d2887ef56 100644 --- a/rust/arrow/src/util/bit_slice_iterator.rs +++ b/rust/arrow/src/util/bit_slice_iterator.rs @@ -257,112 +257,3 @@ mod tests_bit_slices_little_endian { assert_eq!(buffer_slice.to_vec(), result); } } - -#[cfg(all(test, target_endian = "big"))] -mod tests_bit_slices_big_endian { - use super::*; - use crate::datatypes::ToByteSlice; - - #[test] - fn test_bit_slice_iter_aligned() { - let input: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7]; - let buffer: Buffer = Buffer::from(input); - - let bit_slice = buffer.bit_slice(); - let result = bit_slice.chunks().interpret().collect::>(); - - assert_eq!(vec![0x0001020304050607], result); - } - - #[test] - fn test_bit_slice_iter_unaligned() { - let input: &[u8] = &[ - 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, - 0b00100000, 0b01000000, 0b11111111, - ]; - let buffer: Buffer = Buffer::from(input); - - let bit_slice = buffer.bit_slice().view(4, 64); - let chunks = bit_slice.chunks::(); - - assert_eq!(0, chunks.remainder_bit_len()); - assert_eq!(0, chunks.remainder_bits()); - - let result = chunks.interpret().collect::>(); - - assert_eq!( - vec![0b0001_00000010_00000100_00001000_00010000_00100000_01000000_1111], - result - ); - } - - #[test] - fn test_bit_slice_iter_unaligned_remainder_1_byte() { - let input: &[u8] = &[ - 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, - 0b00100000, 0b01000000, 0b11111111, - ]; - let buffer: Buffer = Buffer::from(input); - - let bit_slice = buffer.bit_slice().view(4, 66); - let chunks = bit_slice.chunks::(); - - assert_eq!(2, chunks.remainder_bit_len()); - assert_eq!(0b00000011, chunks.remainder_bits()); - - let result = chunks.interpret().collect::>(); - - assert_eq!( - vec![0b0001_00000010_00000100_00001000_00010000_00100000_01000000_1111], - result - ); - } - - #[test] - fn test_bit_slice_iter_unaligned_remainder_bits_across_bytes() { - let input: &[u8] = &[0b00111111, 0b11111100]; - let buffer: Buffer = Buffer::from(input); - - // remainder contains bits from both bytes - // result should be the highest 2 bits from first byte followed by lowest 5 bits of second bytes - let bit_slice = buffer.bit_slice().view(6, 7); - let chunks = bit_slice.chunks::(); - - assert_eq!(7, chunks.remainder_bit_len()); - assert_eq!(0b01111111, chunks.remainder_bits()); - } - - #[test] - fn test_bit_slice_iter_unaligned_remainder_bits_large() { - let input: &[u8] = &[ - 0b11111111, 0b00000000, 0b11111111, 0b00000000, 0b11111111, 0b00000000, - 0b11111111, 0b00000000, 0b11111111, - ]; - let buffer: Buffer = Buffer::from(input); - - let bit_slice = buffer.bit_slice().view(2, 63); - let chunks = bit_slice.chunks::(); - - assert_eq!(63, chunks.remainder_bit_len()); - assert_eq!( - 0b1111110_00000001_11111110_00000001_11111110_00000001_11111110_00000001, - chunks.remainder_bits() - ); - } - - #[test] - fn test_bit_slice_iter_reinterpret() { - assert_eq!(LocalBits::default(), Msb0::default()); - let buffer_slice = &[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice(); - // Name of the bit slice comes from byte slice, since it is still on the stack and behaves similarly to Rust's byte slice. - let buffer = Buffer::from(buffer_slice); - - // Let's get the whole buffer. - let bit_slice = buffer.bit_slice().view(0, buffer_slice.len() * 8); - // Let's also get a chunked bits as u8, not u64 this time... - let chunks = bit_slice.chunks::(); - - let result = chunks.interpret().collect::>(); - assert_eq!(buffer_slice.to_vec(), result); - } -} From 6f17aea804f57025229f6c3379e34432c41a1dca Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Wed, 11 Nov 2020 00:09:09 +0100 Subject: [PATCH 3/4] Make operations aware of prefetcher behavior --- rust/arrow/src/compute/kernels/aggregate.rs | 12 ++++++------ rust/arrow/src/util/buffered_iterator.rs | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/compute/kernels/aggregate.rs b/rust/arrow/src/compute/kernels/aggregate.rs index 51ac11320c0..e332a644de5 100644 --- a/rust/arrow/src/compute/kernels/aggregate.rs +++ b/rust/arrow/src/compute/kernels/aggregate.rs @@ -159,12 +159,6 @@ where let buffer_chunk_iter = buffer_chunks.interpret(); - remainder.iter().enumerate().for_each(|(i, value)| { - if buffer_remainder_bits & (1 << i) != 0 { - sum = sum + *value; - } - }); - &data_chunks .zip(buffer_chunk_iter) .for_each(|(chunk, mask)| { @@ -175,6 +169,12 @@ where }); }); + remainder.iter().enumerate().for_each(|(i, value)| { + if buffer_remainder_bits & (1 << i) != 0 { + sum = sum + *value; + } + }); + Some(sum) } } diff --git a/rust/arrow/src/util/buffered_iterator.rs b/rust/arrow/src/util/buffered_iterator.rs index 059b82424a8..5d42ee43e66 100644 --- a/rust/arrow/src/util/buffered_iterator.rs +++ b/rust/arrow/src/util/buffered_iterator.rs @@ -54,7 +54,7 @@ where /// Useful to extract the exact item where an error occurred #[inline] pub fn n(&self) -> usize { - return self.buffer.len(); + self.buffer.len() } } From 54c70560c40fea05afb28bf297fce0b8340cbb4f Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Wed, 11 Nov 2020 02:54:53 +0100 Subject: [PATCH 4/4] Organize execution --- rust/arrow/src/util/bit_slice_iterator.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/arrow/src/util/bit_slice_iterator.rs b/rust/arrow/src/util/bit_slice_iterator.rs index e2d2887ef56..dd771288165 100644 --- a/rust/arrow/src/util/bit_slice_iterator.rs +++ b/rust/arrow/src/util/bit_slice_iterator.rs @@ -46,7 +46,7 @@ impl<'a> BufferBitSlice<'a> { /// /// Returns immutable view with the given offset in bits and length in bits. /// This view have zero-copy representation over the actual data. - #[inline] + #[inline(always)] pub fn view(&self, offset_in_bits: usize, len_in_bits: usize) -> Self { Self { buffer_data: self.buffer_data, @@ -58,7 +58,7 @@ impl<'a> BufferBitSlice<'a> { /// Returns bit chunks in native 64-bit allocation size. /// Native representations in Arrow follows 64-bit convention. /// Chunks can still be reinterpreted in any primitive type lower than u64. - #[inline] + #[inline(always)] pub fn chunks(&self) -> BufferBitChunksExact where T: BitMemory, @@ -105,21 +105,21 @@ where { /// /// Returns remainder bit length from the exact chunk iterator - #[inline] + #[inline(always)] pub fn remainder_bit_len(&self) -> usize { self.remainder_len_in_bits } /// /// Returns the remainder bits interpreted as given type. - #[inline] + #[inline(always)] pub fn remainder_bits(&self) -> T { self.remainder } /// /// Interprets underlying chunk's view's bits as a given type. - #[inline] + #[inline(always)] pub fn interpret(self) -> impl Iterator + 'a where T: BitMemory, @@ -129,7 +129,7 @@ where /// /// Returns underlying iterator as it is - #[inline] + #[inline(always)] pub fn iter(&self) -> &ChunksExact<'a, LocalBits, u8> { &self.chunks_exact }