From 13c0ace1629daa8dd2f4b3b0f34d7264f26ac680 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sat, 18 Jul 2020 22:00:11 +0100 Subject: [PATCH 01/11] improve filter kernel performance; also implements support for filtering dictionary arrays --- rust/arrow/src/array/data.rs | 1 + rust/arrow/src/buffer.rs | 2 + rust/arrow/src/compute/kernels/filter.rs | 499 ++++++++++++++++++----- rust/arrow/src/util/bit_util.rs | 54 ++- 4 files changed, 441 insertions(+), 115 deletions(-) diff --git a/rust/arrow/src/array/data.rs b/rust/arrow/src/array/data.rs index f8bf5cf7756..ad949460d08 100644 --- a/rust/arrow/src/array/data.rs +++ b/rust/arrow/src/array/data.rs @@ -151,6 +151,7 @@ impl ArrayData { } /// Returns the offset of this array + #[inline] pub fn offset(&self) -> usize { self.offset } diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index ca01f1d150b..33a0af9e4bd 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -545,10 +545,12 @@ impl MutableBuffer { /// /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. + #[inline] pub fn raw_data(&self) -> *const u8 { self.data } + #[inline] pub fn raw_data_mut(&mut self) -> *mut u8 { self.data } diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index 52e12cfef19..1d24f17aa4d 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -17,139 +17,395 @@ //! Defines miscellaneous array kernels. -use std::sync::Arc; - +use std::{mem, sync::Arc}; +use std::io::Write; use crate::array::*; -use crate::datatypes::{ArrowNumericType, DataType, TimeUnit}; +use crate::datatypes::{DataType, ArrowNumericType, TimeUnit}; use crate::error::{ArrowError, Result}; +use crate::{buffer::{MutableBuffer, Buffer}, bitmap::Bitmap, util::bit_util}; +use crate::record_batch::RecordBatch; -/// Helper function to perform boolean lambda function on values from two arrays. -fn bool_op( - left: &PrimitiveArray, - right: &PrimitiveArray, - op: F, -) -> Result -where - T: ArrowNumericType, - F: Fn(Option, Option) -> bool, -{ - if left.len() != right.len() { - return Err(ArrowError::ComputeError( - "Cannot perform math operation on arrays of different length".to_string(), - )); +/// trait for copying filtered null bitmap bits +trait CopyNullBit { + fn copy_null_bit(&mut self, source_index: usize); + fn null_count(&self) -> usize; + fn null_buffer(&mut self) -> Buffer; +} + +/// no-op null bitmap copy implementation, +/// used when the filtered data array doesn't have a null bitmap +struct NullBitNoop {} + +impl NullBitNoop { + fn new() -> Self { + NullBitNoop {} } - let mut b = BooleanArray::builder(left.len()); - for i in 0..left.len() { - let index = i; - let l = if left.is_null(i) { - None - } else { - Some(left.value(index)) - }; - let r = if right.is_null(i) { - None - } else { - Some(right.value(index)) - }; - b.append_value(op(l, r))?; +} + +impl CopyNullBit for NullBitNoop { + #[inline] + fn copy_null_bit(&mut self, _source_index: usize) { + // do nothing } - Ok(b.finish()) + + fn null_count(&self) -> usize { 0 } + + fn null_buffer(&mut self) -> Buffer { Buffer::from([0u8; 0]) } } -macro_rules! filter_array { - ($array:expr, $filter:expr, $array_type:ident) => {{ - let b = $array.as_any().downcast_ref::<$array_type>().unwrap(); - let mut builder = $array_type::builder(b.len()); - for i in 0..b.len() { - if $filter.value(i) { - if b.is_null(i) { - builder.append_null()?; - } else { - builder.append_value(b.value(i))?; - } - } - } - Ok(Arc::new(builder.finish())) - }}; +/// null bitmap copy implementation, +/// used when the filtered data array has a null bitmap +struct NullBitSetter<'a> { + target_buffer: MutableBuffer, + source_bytes: &'a [u8], + target_index: usize, + null_count: usize } -/// Returns the array, taking only the elements matching the filter -pub fn filter(array: &Array, filter: &BooleanArray) -> Result { - match array.data_type() { - DataType::UInt8 => filter_array!(array, filter, UInt8Array), - DataType::UInt16 => filter_array!(array, filter, UInt16Array), - DataType::UInt32 => filter_array!(array, filter, UInt32Array), - DataType::UInt64 => filter_array!(array, filter, UInt64Array), - DataType::Int8 => filter_array!(array, filter, Int8Array), - DataType::Int16 => filter_array!(array, filter, Int16Array), - DataType::Int32 => filter_array!(array, filter, Int32Array), - DataType::Int64 => filter_array!(array, filter, Int64Array), - DataType::Float32 => filter_array!(array, filter, Float32Array), - DataType::Float64 => filter_array!(array, filter, Float64Array), - DataType::Boolean => filter_array!(array, filter, BooleanArray), - DataType::Date32(_) => filter_array!(array, filter, Date32Array), - DataType::Date64(_) => filter_array!(array, filter, Date64Array), - DataType::Time32(TimeUnit::Second) => { - filter_array!(array, filter, Time32SecondArray) - } - DataType::Time32(TimeUnit::Millisecond) => { - filter_array!(array, filter, Time32MillisecondArray) - } - DataType::Time64(TimeUnit::Microsecond) => { - filter_array!(array, filter, Time64MicrosecondArray) - } - DataType::Time64(TimeUnit::Nanosecond) => { - filter_array!(array, filter, Time64NanosecondArray) - } - DataType::Duration(TimeUnit::Second) => { - filter_array!(array, filter, DurationSecondArray) - } - DataType::Duration(TimeUnit::Millisecond) => { - filter_array!(array, filter, DurationMillisecondArray) - } - DataType::Duration(TimeUnit::Microsecond) => { - filter_array!(array, filter, DurationMicrosecondArray) - } - DataType::Duration(TimeUnit::Nanosecond) => { - filter_array!(array, filter, DurationNanosecondArray) +impl<'a> NullBitSetter<'a> { + fn new(null_bitmap: &'a Bitmap) -> Self { + let null_bytes = null_bitmap.buffer_ref().data(); + // create null bitmap buffer with same length and initialize null bitmap buffer to 1s + let null_buffer = MutableBuffer::new(null_bytes.len()) + .with_bitset(null_bytes.len(), true); + NullBitSetter { + source_bytes: null_bytes, + target_buffer: null_buffer, + target_index: 0, + null_count: 0 } - DataType::Timestamp(TimeUnit::Second, _) => { - filter_array!(array, filter, TimestampSecondArray) + } +} + +impl<'a> CopyNullBit for NullBitSetter<'a> { + #[inline] + fn copy_null_bit(&mut self, source_index: usize) { + if bit_util::get_bit(self.source_bytes, source_index) == false { + // this is not actually unsafe because of the condition above + target_buffer.len() == source_bytes.len() + unsafe { bit_util::unset_bit_raw(self.target_buffer.raw_data_mut(), self.target_index); } + self.null_count += 1; } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - filter_array!(array, filter, TimestampMillisecondArray) + self.target_index += 1; + } + + fn null_count(&self) -> usize { self.null_count } + + fn null_buffer(&mut self) -> Buffer { + self.target_buffer.resize(self.target_index).unwrap(); + // use mem::replace to detach self.target_buffer from self so that it can be returned + let target_buffer = mem::replace(&mut self.target_buffer, MutableBuffer::new(0)); + target_buffer.freeze() + } +} + +fn get_null_bit_setter<'a>(data_array: &'a impl Array ) -> Box { + if let Some(null_bitmap) = data_array.data_ref().null_bitmap() { + // only return an actual null bit copy implementation if null_bitmap is set + Box::new(NullBitSetter::new(null_bitmap)) + } + else { + // otherwise return a no-op copy null bit implementation + // for improved performance when the filtered array doesn't contain NULLs + Box::new(NullBitNoop::new()) + } +} + +// transmute filter array to u64 +// - optimize filtering using highly selective filters by skipping entire batches of 64 filter bits +// - copy null bit immediately +// - copy data values in batches +fn filter_array_impl(filter_context: &FilterContext, data_array: &impl Array, array_type: DataType, value_size: usize) -> Result +{ + let filtered_count = filter_context.filtered_count; + let filter_mask = &filter_context.filter_mask; + let filter_u64 = &filter_context.filter_u64; + let data_bytes = data_array.data_ref().buffers()[0].data(); + let mut value_buffer = MutableBuffer::new(filtered_count * value_size); + let mut null_bit_setter = get_null_bit_setter(data_array); + let null_bit_setter = null_bit_setter.as_mut(); + let mut copy_buffer: Vec = vec![0; 64 * value_size]; + + for i in 0..filter_u64.len() { + // foreach u64 batch + let filter_batch = filter_u64[i]; + if filter_batch == 0 { + // if batch == 0: skip + continue; } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - filter_array!(array, filter, TimestampMicrosecondArray) + + let mut copy_count = 0; + for j in 0..64 { + // foreach bit in batch: + if (filter_batch & filter_mask[j]) != 0 { + let data_index = (i * 64) + j; + null_bit_setter.copy_null_bit(data_index); + // if filter bit == 1: copy data value to temp array + // let copy_buffer_start = value_size * copy_count; + // let data_buffer_start = value_size * data_index; + copy_buffer[(value_size * copy_count)..((value_size * copy_count) + value_size)] + .copy_from_slice(&data_bytes[(value_size * data_index)..((value_size * data_index) + value_size)]); + copy_count += 1; + } } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - filter_array!(array, filter, TimestampNanosecondArray) + // copy temp array to output buffer + value_buffer.write(©_buffer[0..(value_size * copy_count)])?; + } + + let mut array_data_builder = ArrayDataBuilder::new(array_type) + .len(filtered_count) + .add_buffer(value_buffer.freeze()); + if null_bit_setter.null_count() > 0 { + array_data_builder = array_data_builder + .null_count(null_bit_setter.null_count()) + .null_bit_buffer(null_bit_setter.null_buffer()); + } + + Ok(array_data_builder) +} + +/// FilterContext can be used to improve performance when +/// filtering multiple data arrays with the same filter array. +#[derive(Debug)] +pub struct FilterContext { + filter_u64: Vec, + filtered_count: usize, + filter_mask: Vec +} + +macro_rules! filter_primitive_array { + ($context:expr, $array:expr, $array_type:ident) => {{ + let input_array = $array.as_any().downcast_ref::<$array_type>().unwrap(); + let output_array = $context.filter_primitive_array(input_array)?; + Ok(Arc::new(output_array)) + }}; +} + +macro_rules! filter_dictionary_array { + ($context:expr, $array:expr, $array_type:ident) => {{ + let input_array = $array.as_any().downcast_ref::<$array_type>().unwrap(); + let output_array = $context.filter_dictionary_array(input_array)?; + Ok(Arc::new(output_array)) + }}; +} + +impl FilterContext { + /// Returns a new instance of FilterContext + pub fn new(filter_array: &BooleanArray) -> Self { + let filter_mask: Vec = (0..64).map(|x| 1u64 << x).collect(); + let filter_bytes = filter_array.data_ref().buffers()[0].data(); + let filtered_count = bit_util::count_set_bits(filter_bytes); + // transmute filter_bytes to &[u64] + let mut u64_buffer = MutableBuffer::new(filter_bytes.len()); + u64_buffer.write_bytes(filter_bytes, u64_buffer.capacity() - filter_bytes.len()).unwrap(); + let filter_u64 = u64_buffer.typed_data_mut::().to_owned(); + FilterContext { + filter_u64, + filtered_count, + filter_mask } - DataType::Binary => { - let b = array.as_any().downcast_ref::().unwrap(); - let mut values: Vec<&[u8]> = Vec::with_capacity(b.len()); - for i in 0..b.len() { - if filter.value(i) { - values.push(b.value(i)); + } + + /// Returns a new array, containing only the elements matching the filter + pub fn filter(&self, array: &Array) -> Result { + match array.data_type() { + DataType::UInt8 => filter_primitive_array!(self, array, UInt8Array), + DataType::UInt16 => filter_primitive_array!(self, array, UInt16Array), + DataType::UInt32 => filter_primitive_array!(self, array, UInt32Array), + DataType::UInt64 => filter_primitive_array!(self, array, UInt64Array), + DataType::Int8 => filter_primitive_array!(self, array, Int8Array), + DataType::Int16 => filter_primitive_array!(self, array, Int16Array), + DataType::Int32 => filter_primitive_array!(self, array, Int32Array), + DataType::Int64 => filter_primitive_array!(self, array, Int64Array), + DataType::Float32 => filter_primitive_array!(self, array, Float32Array), + DataType::Float64 => filter_primitive_array!(self, array, Float64Array), + DataType::Boolean => { + let input_array = array.as_any().downcast_ref::().unwrap(); + let mut builder = BooleanArray::builder(self.filtered_count); + for i in 0..self.filter_u64.len() { + // foreach u64 batch + let filter_batch = self.filter_u64[i]; + if filter_batch == 0 { + // if batch == 0: skip + continue; + } + for j in 0..64 { + // foreach bit in batch: + if (filter_batch & self.filter_mask[j]) != 0 { + let data_index = (i * 64) + j; + if input_array.is_null(data_index) { + builder.append_null()?; + } else { + builder.append_value(input_array.value(data_index))?; + } + } + } } + Ok(Arc::new(builder.finish())) + }, + DataType::Date32(_) => filter_primitive_array!(self, array, Date32Array), + DataType::Date64(_) => filter_primitive_array!(self, array, Date64Array), + DataType::Time32(TimeUnit::Second) => { + filter_primitive_array!(self, array, Time32SecondArray) } - Ok(Arc::new(BinaryArray::from(values))) - } - DataType::Utf8 => { - let b = array.as_any().downcast_ref::().unwrap(); - let mut values: Vec<&str> = Vec::with_capacity(b.len()); - for i in 0..b.len() { - if filter.value(i) { - values.push(b.value(i)); + DataType::Time32(TimeUnit::Millisecond) => { + filter_primitive_array!(self, array, Time32MillisecondArray) + } + DataType::Time64(TimeUnit::Microsecond) => { + filter_primitive_array!(self, array, Time64MicrosecondArray) + } + DataType::Time64(TimeUnit::Nanosecond) => { + filter_primitive_array!(self, array, Time64NanosecondArray) + } + DataType::Duration(TimeUnit::Second) => { + filter_primitive_array!(self, array, DurationSecondArray) + } + DataType::Duration(TimeUnit::Millisecond) => { + filter_primitive_array!(self, array, DurationMillisecondArray) + } + DataType::Duration(TimeUnit::Microsecond) => { + filter_primitive_array!(self, array, DurationMicrosecondArray) + } + DataType::Duration(TimeUnit::Nanosecond) => { + filter_primitive_array!(self, array, DurationNanosecondArray) + } + DataType::Timestamp(TimeUnit::Second, _) => { + filter_primitive_array!(self, array, TimestampSecondArray) + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + filter_primitive_array!(self, array, TimestampMillisecondArray) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + filter_primitive_array!(self, array, TimestampMicrosecondArray) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + filter_primitive_array!(self, array, TimestampNanosecondArray) + } + DataType::Binary => { + let input_array = array.as_any().downcast_ref::().unwrap(); + let mut values: Vec<&[u8]> = Vec::with_capacity(self.filtered_count); + for i in 0..self.filter_u64.len() { + // foreach u64 batch + let filter_batch = self.filter_u64[i]; + if filter_batch == 0 { + // if batch == 0: skip + continue; + } + for j in 0..64 { + // foreach bit in batch: + if (filter_batch & self.filter_mask[j]) != 0 { + let data_index = (i * 64) + j; + values.push(input_array.value(data_index)); + } + } + } + Ok(Arc::new(BinaryArray::from(values))) + } + DataType::Utf8 => { + let input_array = array.as_any().downcast_ref::().unwrap(); + let mut values: Vec<&str> = Vec::with_capacity(self.filtered_count); + for i in 0..self.filter_u64.len() { + // foreach u64 batch + let filter_batch = self.filter_u64[i]; + if filter_batch == 0 { + // if batch == 0: skip + continue; + } + for j in 0..64 { + // foreach bit in batch: + if (filter_batch & self.filter_mask[j]) != 0 { + let data_index = (i * 64) + j; + values.push(input_array.value(data_index)); + } + } + } + Ok(Arc::new(StringArray::from(values))) + } + DataType::Dictionary(ref key_type, ref value_type) => match (key_type.as_ref(), value_type.as_ref()) { + (key_type, DataType::Utf8) => match key_type { + DataType::UInt8 => filter_dictionary_array!(self, array, UInt8DictionaryArray), + DataType::UInt16 => filter_dictionary_array!(self, array, UInt16DictionaryArray), + DataType::UInt32 => filter_dictionary_array!(self, array, UInt32DictionaryArray), + DataType::UInt64 => filter_dictionary_array!(self, array, UInt64DictionaryArray), + DataType::Int8 => filter_dictionary_array!(self, array, Int8DictionaryArray), + DataType::Int16 => filter_dictionary_array!(self, array, Int16DictionaryArray), + DataType::Int32 => filter_dictionary_array!(self, array, Int32DictionaryArray), + DataType::Int64 => filter_dictionary_array!(self, array, Int64DictionaryArray), + other => Err(ArrowError::ComputeError(format!( + "filter not supported for string dictionary with key of type {:?}", + other + ))) } + (key_type, value_type) => Err(ArrowError::ComputeError(format!( + "filter not supported for Dictionary({:?}, {:?})", + key_type, value_type + ))) } - Ok(Arc::new(StringArray::from(values))) + other => Err(ArrowError::ComputeError(format!( + "filter not supported for {:?}", + other + ))), } - other => Err(ArrowError::ComputeError(format!( - "filter not supported for {:?}", - other - ))), } + + /// Returns a new PrimitiveArray containing only those values from the array passed as the data_array parameter, + /// selected by the BooleanArray passed as the filter_array parameter + pub fn filter_primitive_array(&self, data_array: &PrimitiveArray) -> Result> + where + T: ArrowNumericType + { + let array_type = T::get_data_type(); + let value_size = mem::size_of::(); + let array_data_builder = filter_array_impl(self, data_array, array_type, value_size)?; + let data = array_data_builder.build(); + Ok(PrimitiveArray::::from(data)) + } + + /// Returns a new DictionaryArray containing only those keys from the array passed as the data_array parameter, + /// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array. + pub fn filter_dictionary_array(&self, data_array: &DictionaryArray) -> Result> + where + T: ArrowNumericType + { + let array_type = data_array.data_type().clone(); + let value_size = mem::size_of::(); + let mut array_data_builder = filter_array_impl(self, data_array, array_type, value_size)?; + // copy dictionary values from input array + array_data_builder = array_data_builder.add_child_data(data_array.values().data()); + let data = array_data_builder.build(); + Ok(DictionaryArray::::from(data)) + } +} + +/// Returns a new array, containing only the elements matching the filter. +pub fn filter(array: &Array, filter: &BooleanArray) -> Result { + FilterContext::new(filter).filter(array) +} + +/// Returns a new PrimitiveArray containing only those values from the array passed as the data_array parameter, +/// selected by the BooleanArray passed as the filter_array parameter +pub fn filter_primitive_array(data_array: &PrimitiveArray, filter_array: &BooleanArray) -> Result> +where + T: ArrowNumericType +{ + FilterContext::new(filter_array).filter_primitive_array(data_array) +} + +/// Returns a new DictionaryArray containing only those keys from the array passed as the data_array parameter, +/// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array. +pub fn filter_dictionary_array(data_array: &DictionaryArray, filter_array: &BooleanArray) -> Result> +where + T: ArrowNumericType +{ + FilterContext::new(filter_array).filter_dictionary_array(data_array) +} + +/// Returns a new RecordBatch with arrays containing only values matching the filter. +/// The same FilterContext is re-used when filtering arrays in the RecordBatch for better performance. +pub fn filter_record_batch(record_batch: &RecordBatch, filter_array: &BooleanArray) -> Result { + let filter_context = FilterContext::new(filter_array); + let filtered_arrays = record_batch.columns().iter() + .map(|a| filter_context.filter(a.as_ref()).unwrap()).collect(); + RecordBatch::try_new(record_batch.schema().clone(), filtered_arrays) } #[cfg(test)] @@ -273,4 +529,21 @@ mod tests { assert_eq!(1, d.len()); assert_eq!(true, d.is_null(0)); } + + #[test] + fn test_filter_dictionary_array() { + let values = vec![Some("hello"), None, Some("world"), Some("!")]; + let a: Int8DictionaryArray = values.iter().map(|&x| x).collect(); + let b = BooleanArray::from(vec![false, true, true, false]); + let c = filter(&a, &b).unwrap(); + let d = c.as_ref().as_any().downcast_ref::().unwrap(); + let value_array = d.values(); + let values = value_array.as_any().downcast_ref::().unwrap(); + // values are cloned in the filtered dictionary array + assert_eq!(3, values.len()); + // but keys are filtered + assert_eq!(2, d.len()); + assert_eq!(true, d.is_null(0)); + assert_eq!("world", values.value(d.keys().nth(1).unwrap().unwrap() as usize)); + } } diff --git a/rust/arrow/src/util/bit_util.rs b/rust/arrow/src/util/bit_util.rs index a2ada2c0323..59244f69359 100644 --- a/rust/arrow/src/util/bit_util.rs +++ b/rust/arrow/src/util/bit_util.rs @@ -68,7 +68,7 @@ pub unsafe fn get_bit_raw(data: *const u8, i: usize) -> bool { /// Sets bit at position `i` for `data` #[inline] pub fn set_bit(data: &mut [u8], i: usize) { - data[i >> 3] |= BIT_MASK[i & 7] + data[i >> 3] |= BIT_MASK[i & 7]; } /// Sets bit at position `i` for `data` @@ -79,7 +79,22 @@ pub fn set_bit(data: &mut [u8], i: usize) { /// responsible to guarantee that `i` is within bounds. #[inline] pub unsafe fn set_bit_raw(data: *mut u8, i: usize) { - *data.add(i >> 3) |= BIT_MASK[i & 7] + *data.add(i >> 3) |= BIT_MASK[i & 7]; +} + +/// Sets bit at position `i` for `data` to 0 +#[inline] +pub fn unset_bit(data: &mut [u8], i: usize) { + data[i >> 3] ^= BIT_MASK[i & 7]; +} + +/// Sets bit at position `i` for `data` to 0 +/// +/// Note this doesn't do any bound checking, for performance reason. The caller is +/// responsible to guarantee that `i` is within bounds. +#[inline] +pub unsafe fn unset_bit_raw(data: *mut u8, i: usize) { + *data.add(i >> 3) ^= BIT_MASK[i & 7]; } /// Sets bits in the non-inclusive range `start..end` for `data` @@ -257,6 +272,17 @@ mod tests { assert_eq!([0b00100101], b); } + #[test] + fn test_unset_bit() { + let mut b = [0b11111111]; + unset_bit(&mut b, 0); + assert_eq!([0b11111110], b); + unset_bit(&mut b, 2); + assert_eq!([0b11111010], b); + unset_bit(&mut b, 5); + assert_eq!([0b11011010], b); + } + #[test] fn test_set_bit_raw() { const NUM_BYTE: usize = 10; @@ -281,6 +307,30 @@ mod tests { } } + #[test] + fn test_unset_bit_raw() { + const NUM_BYTE: usize = 10; + let mut buf = vec![255; NUM_BYTE]; + let mut expected = vec![]; + let mut rng = thread_rng(); + for i in 0..8 * NUM_BYTE { + let b = rng.gen_bool(0.5); + expected.push(b); + if !b { + unsafe { + unset_bit_raw(buf.as_mut_ptr(), i); + } + } + } + + let raw_ptr = buf.as_ptr(); + for (i, b) in expected.iter().enumerate() { + unsafe { + assert_eq!(*b, get_bit_raw(raw_ptr, i)); + } + } + } + #[test] fn test_set_bits_raw() { const NUM_BYTE: usize = 64; From a5bf99484bd32fb815f5c2cf7a9402798a4e92fc Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sat, 18 Jul 2020 22:08:03 +0100 Subject: [PATCH 02/11] fix rustfmt issues --- rust/arrow/src/compute/kernels/filter.rs | 150 +++++++++++++++-------- 1 file changed, 101 insertions(+), 49 deletions(-) diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index 1d24f17aa4d..0e3a8ec07b2 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -17,13 +17,17 @@ //! Defines miscellaneous array kernels. -use std::{mem, sync::Arc}; -use std::io::Write; use crate::array::*; -use crate::datatypes::{DataType, ArrowNumericType, TimeUnit}; +use crate::datatypes::{ArrowNumericType, DataType, TimeUnit}; use crate::error::{ArrowError, Result}; -use crate::{buffer::{MutableBuffer, Buffer}, bitmap::Bitmap, util::bit_util}; use crate::record_batch::RecordBatch; +use crate::{ + bitmap::Bitmap, + buffer::{Buffer, MutableBuffer}, + util::bit_util, +}; +use std::io::Write; +use std::{mem, sync::Arc}; /// trait for copying filtered null bitmap bits trait CopyNullBit { @@ -32,7 +36,7 @@ trait CopyNullBit { fn null_buffer(&mut self) -> Buffer; } -/// no-op null bitmap copy implementation, +/// no-op null bitmap copy implementation, /// used when the filtered data array doesn't have a null bitmap struct NullBitNoop {} @@ -48,31 +52,35 @@ impl CopyNullBit for NullBitNoop { // do nothing } - fn null_count(&self) -> usize { 0 } + fn null_count(&self) -> usize { + 0 + } - fn null_buffer(&mut self) -> Buffer { Buffer::from([0u8; 0]) } + fn null_buffer(&mut self) -> Buffer { + Buffer::from([0u8; 0]) + } } -/// null bitmap copy implementation, +/// null bitmap copy implementation, /// used when the filtered data array has a null bitmap struct NullBitSetter<'a> { target_buffer: MutableBuffer, source_bytes: &'a [u8], target_index: usize, - null_count: usize + null_count: usize, } impl<'a> NullBitSetter<'a> { fn new(null_bitmap: &'a Bitmap) -> Self { let null_bytes = null_bitmap.buffer_ref().data(); // create null bitmap buffer with same length and initialize null bitmap buffer to 1s - let null_buffer = MutableBuffer::new(null_bytes.len()) - .with_bitset(null_bytes.len(), true); + let null_buffer = + MutableBuffer::new(null_bytes.len()).with_bitset(null_bytes.len(), true); NullBitSetter { source_bytes: null_bytes, target_buffer: null_buffer, target_index: 0, - null_count: 0 + null_count: 0, } } } @@ -82,13 +90,20 @@ impl<'a> CopyNullBit for NullBitSetter<'a> { fn copy_null_bit(&mut self, source_index: usize) { if bit_util::get_bit(self.source_bytes, source_index) == false { // this is not actually unsafe because of the condition above + target_buffer.len() == source_bytes.len() - unsafe { bit_util::unset_bit_raw(self.target_buffer.raw_data_mut(), self.target_index); } + unsafe { + bit_util::unset_bit_raw( + self.target_buffer.raw_data_mut(), + self.target_index, + ); + } self.null_count += 1; } self.target_index += 1; } - fn null_count(&self) -> usize { self.null_count } + fn null_count(&self) -> usize { + self.null_count + } fn null_buffer(&mut self) -> Buffer { self.target_buffer.resize(self.target_index).unwrap(); @@ -98,12 +113,11 @@ impl<'a> CopyNullBit for NullBitSetter<'a> { } } -fn get_null_bit_setter<'a>(data_array: &'a impl Array ) -> Box { +fn get_null_bit_setter<'a>(data_array: &'a impl Array) -> Box { if let Some(null_bitmap) = data_array.data_ref().null_bitmap() { // only return an actual null bit copy implementation if null_bitmap is set Box::new(NullBitSetter::new(null_bitmap)) - } - else { + } else { // otherwise return a no-op copy null bit implementation // for improved performance when the filtered array doesn't contain NULLs Box::new(NullBitNoop::new()) @@ -112,10 +126,14 @@ fn get_null_bit_setter<'a>(data_array: &'a impl Array ) -> Box // transmute filter array to u64 // - optimize filtering using highly selective filters by skipping entire batches of 64 filter bits -// - copy null bit immediately +// - copy null bit immediately // - copy data values in batches -fn filter_array_impl(filter_context: &FilterContext, data_array: &impl Array, array_type: DataType, value_size: usize) -> Result -{ +fn filter_array_impl( + filter_context: &FilterContext, + data_array: &impl Array, + array_type: DataType, + value_size: usize, +) -> Result { let filtered_count = filter_context.filtered_count; let filter_mask = &filter_context.filter_mask; let filter_u64 = &filter_context.filter_u64; @@ -124,7 +142,7 @@ fn filter_array_impl(filter_context: &FilterContext, data_array: &impl Array, ar let mut null_bit_setter = get_null_bit_setter(data_array); let null_bit_setter = null_bit_setter.as_mut(); let mut copy_buffer: Vec = vec![0; 64 * value_size]; - + for i in 0..filter_u64.len() { // foreach u64 batch let filter_batch = filter_u64[i]; @@ -142,15 +160,19 @@ fn filter_array_impl(filter_context: &FilterContext, data_array: &impl Array, ar // if filter bit == 1: copy data value to temp array // let copy_buffer_start = value_size * copy_count; // let data_buffer_start = value_size * data_index; - copy_buffer[(value_size * copy_count)..((value_size * copy_count) + value_size)] - .copy_from_slice(&data_bytes[(value_size * data_index)..((value_size * data_index) + value_size)]); + copy_buffer + [(value_size * copy_count)..((value_size * copy_count) + value_size)] + .copy_from_slice( + &data_bytes[(value_size * data_index) + ..((value_size * data_index) + value_size)], + ); copy_count += 1; } } // copy temp array to output buffer value_buffer.write(©_buffer[0..(value_size * copy_count)])?; } - + let mut array_data_builder = ArrayDataBuilder::new(array_type) .len(filtered_count) .add_buffer(value_buffer.freeze()); @@ -163,13 +185,13 @@ fn filter_array_impl(filter_context: &FilterContext, data_array: &impl Array, ar Ok(array_data_builder) } -/// FilterContext can be used to improve performance when +/// FilterContext can be used to improve performance when /// filtering multiple data arrays with the same filter array. #[derive(Debug)] pub struct FilterContext { filter_u64: Vec, filtered_count: usize, - filter_mask: Vec + filter_mask: Vec, } macro_rules! filter_primitive_array { @@ -196,12 +218,14 @@ impl FilterContext { let filtered_count = bit_util::count_set_bits(filter_bytes); // transmute filter_bytes to &[u64] let mut u64_buffer = MutableBuffer::new(filter_bytes.len()); - u64_buffer.write_bytes(filter_bytes, u64_buffer.capacity() - filter_bytes.len()).unwrap(); + u64_buffer + .write_bytes(filter_bytes, u64_buffer.capacity() - filter_bytes.len()) + .unwrap(); let filter_u64 = u64_buffer.typed_data_mut::().to_owned(); FilterContext { filter_u64, filtered_count, - filter_mask + filter_mask, } } @@ -347,30 +371,39 @@ impl FilterContext { } } - /// Returns a new PrimitiveArray containing only those values from the array passed as the data_array parameter, + /// Returns a new PrimitiveArray containing only those values from the array passed as the data_array parameter, /// selected by the BooleanArray passed as the filter_array parameter - pub fn filter_primitive_array(&self, data_array: &PrimitiveArray) -> Result> + pub fn filter_primitive_array( + &self, + data_array: &PrimitiveArray, + ) -> Result> where - T: ArrowNumericType + T: ArrowNumericType, { let array_type = T::get_data_type(); let value_size = mem::size_of::(); - let array_data_builder = filter_array_impl(self, data_array, array_type, value_size)?; + let array_data_builder = + filter_array_impl(self, data_array, array_type, value_size)?; let data = array_data_builder.build(); Ok(PrimitiveArray::::from(data)) } - /// Returns a new DictionaryArray containing only those keys from the array passed as the data_array parameter, + /// Returns a new DictionaryArray containing only those keys from the array passed as the data_array parameter, /// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array. - pub fn filter_dictionary_array(&self, data_array: &DictionaryArray) -> Result> + pub fn filter_dictionary_array( + &self, + data_array: &DictionaryArray, + ) -> Result> where - T: ArrowNumericType + T: ArrowNumericType, { let array_type = data_array.data_type().clone(); let value_size = mem::size_of::(); - let mut array_data_builder = filter_array_impl(self, data_array, array_type, value_size)?; + let mut array_data_builder = + filter_array_impl(self, data_array, array_type, value_size)?; // copy dictionary values from input array - array_data_builder = array_data_builder.add_child_data(data_array.values().data()); + array_data_builder = + array_data_builder.add_child_data(data_array.values().data()); let data = array_data_builder.build(); Ok(DictionaryArray::::from(data)) } @@ -381,30 +414,42 @@ pub fn filter(array: &Array, filter: &BooleanArray) -> Result { FilterContext::new(filter).filter(array) } -/// Returns a new PrimitiveArray containing only those values from the array passed as the data_array parameter, +/// Returns a new PrimitiveArray containing only those values from the array passed as the data_array parameter, /// selected by the BooleanArray passed as the filter_array parameter -pub fn filter_primitive_array(data_array: &PrimitiveArray, filter_array: &BooleanArray) -> Result> +pub fn filter_primitive_array( + data_array: &PrimitiveArray, + filter_array: &BooleanArray, +) -> Result> where - T: ArrowNumericType + T: ArrowNumericType, { FilterContext::new(filter_array).filter_primitive_array(data_array) } -/// Returns a new DictionaryArray containing only those keys from the array passed as the data_array parameter, +/// Returns a new DictionaryArray containing only those keys from the array passed as the data_array parameter, /// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array. -pub fn filter_dictionary_array(data_array: &DictionaryArray, filter_array: &BooleanArray) -> Result> +pub fn filter_dictionary_array( + data_array: &DictionaryArray, + filter_array: &BooleanArray, +) -> Result> where - T: ArrowNumericType + T: ArrowNumericType, { FilterContext::new(filter_array).filter_dictionary_array(data_array) } -/// Returns a new RecordBatch with arrays containing only values matching the filter. +/// Returns a new RecordBatch with arrays containing only values matching the filter. /// The same FilterContext is re-used when filtering arrays in the RecordBatch for better performance. -pub fn filter_record_batch(record_batch: &RecordBatch, filter_array: &BooleanArray) -> Result { +pub fn filter_record_batch( + record_batch: &RecordBatch, + filter_array: &BooleanArray, +) -> Result { let filter_context = FilterContext::new(filter_array); - let filtered_arrays = record_batch.columns().iter() - .map(|a| filter_context.filter(a.as_ref()).unwrap()).collect(); + let filtered_arrays = record_batch + .columns() + .iter() + .map(|a| filter_context.filter(a.as_ref()).unwrap()) + .collect(); RecordBatch::try_new(record_batch.schema().clone(), filtered_arrays) } @@ -536,7 +581,11 @@ mod tests { let a: Int8DictionaryArray = values.iter().map(|&x| x).collect(); let b = BooleanArray::from(vec![false, true, true, false]); let c = filter(&a, &b).unwrap(); - let d = c.as_ref().as_any().downcast_ref::().unwrap(); + let d = c + .as_ref() + .as_any() + .downcast_ref::() + .unwrap(); let value_array = d.values(); let values = value_array.as_any().downcast_ref::().unwrap(); // values are cloned in the filtered dictionary array @@ -544,6 +593,9 @@ mod tests { // but keys are filtered assert_eq!(2, d.len()); assert_eq!(true, d.is_null(0)); - assert_eq!("world", values.value(d.keys().nth(1).unwrap().unwrap() as usize)); + assert_eq!( + "world", + values.value(d.keys().nth(1).unwrap().unwrap() as usize) + ); } } From 17a4e4327b16c4af775638867de23a8f5e5da253 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sat, 18 Jul 2020 22:30:43 +0100 Subject: [PATCH 03/11] add filter kernel benchmarks --- rust/arrow/Cargo.toml | 4 + rust/arrow/benches/filter_kernels.rs | 118 +++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 rust/arrow/benches/filter_kernels.rs diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index dac331b44dd..b92fef3341e 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -88,6 +88,10 @@ harness = false name = "comparison_kernels" harness = false +[[bench]] +name = "filter_kernels" +harness = false + [[bench]] name = "take_kernels" harness = false diff --git a/rust/arrow/benches/filter_kernels.rs b/rust/arrow/benches/filter_kernels.rs new file mode 100644 index 00000000000..f96584f5340 --- /dev/null +++ b/rust/arrow/benches/filter_kernels.rs @@ -0,0 +1,118 @@ +use arrow::array::*; +use arrow::compute::{filter, FilterContext}; +use arrow::datatypes::ArrowNumericType; +use criterion::{criterion_group, criterion_main, Criterion}; + +fn create_primitive_array(size: usize, value_fn: F) -> PrimitiveArray +where + T: ArrowNumericType, + F: Fn(usize) -> T::Native, +{ + let mut builder = PrimitiveArray::::builder(size); + for i in 0..size { + builder.append_value(value_fn(i)).unwrap(); + } + builder.finish() +} + +fn create_u8_array_with_nulls(size: usize) -> UInt8Array { + let mut builder = UInt8Builder::new(size); + for i in 0..size { + if i % 2 == 0 { + builder.append_value(1).unwrap(); + } else { + builder.append_null().unwrap(); + } + } + builder.finish() +} + +fn create_bool_array(size: usize, value_fn: F) -> BooleanArray +where + F: Fn(usize) -> bool, +{ + let mut builder = BooleanBuilder::new(size); + for i in 0..size { + builder.append_value(value_fn(i)).unwrap(); + } + builder.finish() +} + +fn bench_filter_u8(data_array: &UInt8Array, filter_array: &BooleanArray) { + filter( + criterion::black_box(data_array), + criterion::black_box(filter_array), + ) + .unwrap(); +} + +// fn bench_filter_f32(data_array: &Float32Array, filter_array: &BooleanArray) { +// filter(criterion::black_box(data_array), criterion::black_box(filter_array)).unwrap(); +// } + +fn bench_filter_context_u8(data_array: &UInt8Array, filter_context: &FilterContext) { + filter_context + .filter(criterion::black_box(data_array)) + .unwrap(); +} + +fn bench_filter_context_f32(data_array: &Float32Array, filter_context: &FilterContext) { + filter_context + .filter(criterion::black_box(data_array)) + .unwrap(); +} + +fn add_benchmark(c: &mut Criterion) { + let size = 65536; + let filter_array = create_bool_array(size, |i| match i % 2 { + 0 => true, + _ => false, + }); + let sparse_filter_array = create_bool_array(size, |i| match i % 8000 { + 0 => true, + _ => false, + }); + + let filter_context = FilterContext::new(&filter_array); + let sparse_filter_context = FilterContext::new(&sparse_filter_array); + + let data_array = create_primitive_array(size, |i| match i % 2 { + 0 => 1, + _ => 0, + }); + c.bench_function("filter u8 low selectivity", |b| { + b.iter(|| bench_filter_u8(&data_array, &filter_array)) + }); + c.bench_function("filter u8 high selectivity", |b| { + b.iter(|| bench_filter_u8(&data_array, &sparse_filter_array)) + }); + + c.bench_function("filter context u8 low selectivity", |b| { + b.iter(|| bench_filter_context_u8(&data_array, &filter_context)) + }); + c.bench_function("filter context u8 high selectivity", |b| { + b.iter(|| bench_filter_context_u8(&data_array, &sparse_filter_context)) + }); + + let data_array = create_u8_array_with_nulls(size); + c.bench_function("filter context u8 w NULLs low selectivity", |b| { + b.iter(|| bench_filter_context_u8(&data_array, &filter_context)) + }); + c.bench_function("filter context u8 w NULLs high selectivity", |b| { + b.iter(|| bench_filter_context_u8(&data_array, &sparse_filter_context)) + }); + + let data_array = create_primitive_array(size, |i| match i % 2 { + 0 => 1.0, + _ => 0.0, + }); + c.bench_function("filter context f32 low selectivity", |b| { + b.iter(|| bench_filter_context_f32(&data_array, &filter_context)) + }); + c.bench_function("filter context f32 high selectivity", |b| { + b.iter(|| bench_filter_context_f32(&data_array, &sparse_filter_context)) + }); +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); From bf9437e12f9f636265b6999e0cc26e721c1f47d3 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sun, 19 Jul 2020 18:09:45 +0100 Subject: [PATCH 04/11] fix clippy issues --- rust/arrow/src/compute/kernels/filter.rs | 51 +++++++++++++----------- rust/arrow/src/util/bit_util.rs | 2 + 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index 0e3a8ec07b2..4b37b95a181 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -24,9 +24,9 @@ use crate::record_batch::RecordBatch; use crate::{ bitmap::Bitmap, buffer::{Buffer, MutableBuffer}, + memory, util::bit_util, }; -use std::io::Write; use std::{mem, sync::Arc}; /// trait for copying filtered null bitmap bits @@ -88,7 +88,7 @@ impl<'a> NullBitSetter<'a> { impl<'a> CopyNullBit for NullBitSetter<'a> { #[inline] fn copy_null_bit(&mut self, source_index: usize) { - if bit_util::get_bit(self.source_bytes, source_index) == false { + if !bit_util::get_bit(self.source_bytes, source_index) { // this is not actually unsafe because of the condition above + target_buffer.len() == source_bytes.len() unsafe { bit_util::unset_bit_raw( @@ -125,52 +125,53 @@ fn get_null_bit_setter<'a>(data_array: &'a impl Array) -> Box } // transmute filter array to u64 -// - optimize filtering using highly selective filters by skipping entire batches of 64 filter bits -// - copy null bit immediately -// - copy data values in batches +// - optimize filtering with highly selective filters by skipping entire batches of 64 filter bits +// - if the data array being filtered doesn't have a null bitmap, no time is wasted to copy a null bitmap fn filter_array_impl( filter_context: &FilterContext, data_array: &impl Array, array_type: DataType, value_size: usize, ) -> Result { + if filter_context.filter_len > data_array.len() { + return Err(ArrowError::ComputeError( + "Filter array cannot be larger than data array".to_string(), + )); + } let filtered_count = filter_context.filtered_count; let filter_mask = &filter_context.filter_mask; let filter_u64 = &filter_context.filter_u64; - let data_bytes = data_array.data_ref().buffers()[0].data(); + let data_start = data_array.data_ref().buffers()[0].raw_data(); let mut value_buffer = MutableBuffer::new(filtered_count * value_size); + value_buffer.resize(filtered_count * value_size)?; + let mut value_position = value_buffer.raw_data_mut(); let mut null_bit_setter = get_null_bit_setter(data_array); let null_bit_setter = null_bit_setter.as_mut(); - let mut copy_buffer: Vec = vec![0; 64 * value_size]; - for i in 0..filter_u64.len() { + for (i, filter_batch) in filter_u64.iter().enumerate() { // foreach u64 batch - let filter_batch = filter_u64[i]; + let filter_batch = *filter_batch; if filter_batch == 0 { // if batch == 0: skip continue; } - - let mut copy_count = 0; - for j in 0..64 { + for (j, filter_mask) in filter_mask.iter().enumerate() { // foreach bit in batch: - if (filter_batch & filter_mask[j]) != 0 { + if (filter_batch & *filter_mask) != 0 { let data_index = (i * 64) + j; null_bit_setter.copy_null_bit(data_index); // if filter bit == 1: copy data value to temp array - // let copy_buffer_start = value_size * copy_count; - // let data_buffer_start = value_size * data_index; - copy_buffer - [(value_size * copy_count)..((value_size * copy_count) + value_size)] - .copy_from_slice( - &data_bytes[(value_size * data_index) - ..((value_size * data_index) + value_size)], + unsafe { + // this should be safe because of the data_array.len() check at the beginning of the method + memory::memcpy( + value_position, + data_start.add(value_size * data_index), + value_size, ); - copy_count += 1; + value_position = value_position.add(value_size); + } } } - // copy temp array to output buffer - value_buffer.write(©_buffer[0..(value_size * copy_count)])?; } let mut array_data_builder = ArrayDataBuilder::new(array_type) @@ -190,6 +191,7 @@ fn filter_array_impl( #[derive(Debug)] pub struct FilterContext { filter_u64: Vec, + filter_len: usize, filtered_count: usize, filter_mask: Vec, } @@ -224,6 +226,7 @@ impl FilterContext { let filter_u64 = u64_buffer.typed_data_mut::().to_owned(); FilterContext { filter_u64, + filter_len: filter_array.len(), filtered_count, filter_mask, } @@ -450,7 +453,7 @@ pub fn filter_record_batch( .iter() .map(|a| filter_context.filter(a.as_ref()).unwrap()) .collect(); - RecordBatch::try_new(record_batch.schema().clone(), filtered_arrays) + RecordBatch::try_new(record_batch.schema(), filtered_arrays) } #[cfg(test)] diff --git a/rust/arrow/src/util/bit_util.rs b/rust/arrow/src/util/bit_util.rs index 59244f69359..d8ffa6f19c5 100644 --- a/rust/arrow/src/util/bit_util.rs +++ b/rust/arrow/src/util/bit_util.rs @@ -90,6 +90,8 @@ pub fn unset_bit(data: &mut [u8], i: usize) { /// Sets bit at position `i` for `data` to 0 /// +/// # Safety +/// /// Note this doesn't do any bound checking, for performance reason. The caller is /// responsible to guarantee that `i` is within bounds. #[inline] From 54340242f8c8744077694bad94e1205041041fbc Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sun, 19 Jul 2020 19:03:10 +0100 Subject: [PATCH 05/11] add license at the top of filter_kernels.rs --- rust/arrow/benches/filter_kernels.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/rust/arrow/benches/filter_kernels.rs b/rust/arrow/benches/filter_kernels.rs index f96584f5340..17db16565f7 100644 --- a/rust/arrow/benches/filter_kernels.rs +++ b/rust/arrow/benches/filter_kernels.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use arrow::array::*; use arrow::compute::{filter, FilterContext}; use arrow::datatypes::ArrowNumericType; From 223b444eb854c52b99dd5b04292c48bc848aa487 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sat, 25 Jul 2020 16:07:35 +0100 Subject: [PATCH 06/11] remove use of unwrap from filter_record_batch method --- rust/arrow/src/compute/kernels/filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index 4b37b95a181..d287c4b603f 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -451,8 +451,8 @@ pub fn filter_record_batch( let filtered_arrays = record_batch .columns() .iter() - .map(|a| filter_context.filter(a.as_ref()).unwrap()) - .collect(); + .map(|a| filter_context.filter(a.as_ref())) + .collect::>>()?; RecordBatch::try_new(record_batch.schema(), filtered_arrays) } From 5383a9f945db6b95e849dd3b6fef8b6eb88910a7 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sat, 25 Jul 2020 17:07:49 +0100 Subject: [PATCH 07/11] remove use of unsafe from copy_null_bit method --- rust/arrow/src/compute/kernels/filter.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index d287c4b603f..d62a99a55b7 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -89,13 +89,7 @@ impl<'a> CopyNullBit for NullBitSetter<'a> { #[inline] fn copy_null_bit(&mut self, source_index: usize) { if !bit_util::get_bit(self.source_bytes, source_index) { - // this is not actually unsafe because of the condition above + target_buffer.len() == source_bytes.len() - unsafe { - bit_util::unset_bit_raw( - self.target_buffer.raw_data_mut(), - self.target_index, - ); - } + bit_util::unset_bit(self.target_buffer.data_mut(), self.target_index); self.null_count += 1; } self.target_index += 1; From c84d3ba3f09ee32cac6eb2fea05bf399c69adb21 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sat, 25 Jul 2020 19:27:51 +0100 Subject: [PATCH 08/11] change filter_array_impl method to add special case for all 1s batches of filter values --- rust/arrow/benches/filter_kernels.rs | 17 ++++++ rust/arrow/src/compute/kernels/filter.rs | 76 ++++++++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/rust/arrow/benches/filter_kernels.rs b/rust/arrow/benches/filter_kernels.rs index 17db16565f7..ffb18b31d5c 100644 --- a/rust/arrow/benches/filter_kernels.rs +++ b/rust/arrow/benches/filter_kernels.rs @@ -89,9 +89,14 @@ fn add_benchmark(c: &mut Criterion) { 0 => true, _ => false, }); + let dense_filter_array = create_bool_array(size, |i| match i % 8000 { + 0 => false, + _ => true, + }); let filter_context = FilterContext::new(&filter_array); let sparse_filter_context = FilterContext::new(&sparse_filter_array); + let dense_filter_context = FilterContext::new(&dense_filter_array); let data_array = create_primitive_array(size, |i| match i % 2 { 0 => 1, @@ -103,6 +108,9 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("filter u8 high selectivity", |b| { b.iter(|| bench_filter_u8(&data_array, &sparse_filter_array)) }); + c.bench_function("filter u8 very low selectivity", |b| { + b.iter(|| bench_filter_u8(&data_array, &dense_filter_array)) + }); c.bench_function("filter context u8 low selectivity", |b| { b.iter(|| bench_filter_context_u8(&data_array, &filter_context)) @@ -110,6 +118,9 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("filter context u8 high selectivity", |b| { b.iter(|| bench_filter_context_u8(&data_array, &sparse_filter_context)) }); + c.bench_function("filter context u8 very low selectivity", |b| { + b.iter(|| bench_filter_context_u8(&data_array, &dense_filter_context)) + }); let data_array = create_u8_array_with_nulls(size); c.bench_function("filter context u8 w NULLs low selectivity", |b| { @@ -118,6 +129,9 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("filter context u8 w NULLs high selectivity", |b| { b.iter(|| bench_filter_context_u8(&data_array, &sparse_filter_context)) }); + c.bench_function("filter context u8 w NULLs very low selectivity", |b| { + b.iter(|| bench_filter_context_u8(&data_array, &dense_filter_context)) + }); let data_array = create_primitive_array(size, |i| match i % 2 { 0 => 1.0, @@ -129,6 +143,9 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("filter context f32 high selectivity", |b| { b.iter(|| bench_filter_context_f32(&data_array, &sparse_filter_context)) }); + c.bench_function("filter context f32 very low selectivity", |b| { + b.iter(|| bench_filter_context_f32(&data_array, &dense_filter_context)) + }); } criterion_group!(benches, add_benchmark); diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index d62a99a55b7..eecbbe4e98b 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -32,6 +32,7 @@ use std::{mem, sync::Arc}; /// trait for copying filtered null bitmap bits trait CopyNullBit { fn copy_null_bit(&mut self, source_index: usize); + fn advance(&mut self, count: usize); fn null_count(&self) -> usize; fn null_buffer(&mut self) -> Buffer; } @@ -52,6 +53,11 @@ impl CopyNullBit for NullBitNoop { // do nothing } + #[inline] + fn advance(&mut self, _count: usize) { + // do nothing + } + fn null_count(&self) -> usize { 0 } @@ -95,6 +101,13 @@ impl<'a> CopyNullBit for NullBitSetter<'a> { self.target_index += 1; } + #[inline] + fn advance(&mut self, count: usize) { + // only increment target_index + // no need to set any null bits because target_buffer is initialized with all 1s + self.target_index += count; + } + fn null_count(&self) -> usize { self.null_count } @@ -141,6 +154,7 @@ fn filter_array_impl( let mut value_position = value_buffer.raw_data_mut(); let mut null_bit_setter = get_null_bit_setter(data_array); let null_bit_setter = null_bit_setter.as_mut(); + let all_ones_batch = !0u64; for (i, filter_batch) in filter_u64.iter().enumerate() { // foreach u64 batch @@ -148,6 +162,21 @@ fn filter_array_impl( if filter_batch == 0 { // if batch == 0: skip continue; + } else if filter_batch == all_ones_batch { + // if batch == all 1s: copy all 64 values in one go + null_bit_setter.advance(64); + let data_index = i * 64; + let data_len = value_size * 64; + unsafe { + // this should be safe because of the data_array.len() check at the beginning of the method + memory::memcpy( + value_position, + data_start.add(value_size * data_index), + data_len, + ); + value_position = value_position.add(data_len); + } + continue; } for (j, filter_mask) in filter_mask.iter().enumerate() { // foreach bit in batch: @@ -551,6 +580,53 @@ mod tests { assert_eq!(8, d.value(1)); } + #[test] + fn test_filter_array_low_density() { + // this test exercises the all 0's branch of the filter algorithm + let mut data_values = (1..=65).into_iter().collect::>(); + let mut filter_values = (1..=65) + .into_iter() + .map(|i| match i % 65 { + 0 => true, + _ => false, + }) + .collect::>(); + // set up two more values after the batch + data_values.extend_from_slice(&[66, 67]); + filter_values.extend_from_slice(&[false, true]); + let a = Int32Array::from(data_values); + let b = BooleanArray::from(filter_values); + let c = filter(&a, &b).unwrap(); + let d = c.as_ref().as_any().downcast_ref::().unwrap(); + assert_eq!(2, d.len()); + assert_eq!(65, d.value(0)); + assert_eq!(67, d.value(1)); + } + + #[test] + fn test_filter_array_high_density() { + // this test exercises the all 1's branch of the filter algorithm + let mut data_values = (1..=65).into_iter().collect::>(); + let mut filter_values = (1..=65) + .into_iter() + .map(|i| match i % 65 { + 0 => false, + _ => true, + }) + .collect::>(); + // set up two more values after the batch + data_values.extend_from_slice(&[66, 67]); + filter_values.extend_from_slice(&[false, true]); + let a = Int32Array::from(data_values); + let b = BooleanArray::from(filter_values); + let c = filter(&a, &b).unwrap(); + let d = c.as_ref().as_any().downcast_ref::().unwrap(); + assert_eq!(65, d.len()); + assert_eq!(1, d.value(0)); + assert_eq!(64, d.value(63)); + assert_eq!(67, d.value(64)); + } + #[test] fn test_filter_string_array() { let a = StringArray::from(vec!["hello", " ", "world", "!"]); From 3b470d3c0da1dfe4775d38e07953c72103d5be8c Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sun, 26 Jul 2020 12:04:08 +0100 Subject: [PATCH 09/11] fix issue where null bitmap was not copied for batches of all 1s filter values --- rust/arrow/src/compute/kernels/filter.rs | 29 ++++++++++++++---------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index eecbbe4e98b..f2632b3f386 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -32,7 +32,7 @@ use std::{mem, sync::Arc}; /// trait for copying filtered null bitmap bits trait CopyNullBit { fn copy_null_bit(&mut self, source_index: usize); - fn advance(&mut self, count: usize); + fn copy_null_bits(&mut self, source_index: usize, count: usize); fn null_count(&self) -> usize; fn null_buffer(&mut self) -> Buffer; } @@ -54,7 +54,7 @@ impl CopyNullBit for NullBitNoop { } #[inline] - fn advance(&mut self, _count: usize) { + fn copy_null_bits(&mut self, _source_index: usize, _count: usize) { // do nothing } @@ -102,10 +102,10 @@ impl<'a> CopyNullBit for NullBitSetter<'a> { } #[inline] - fn advance(&mut self, count: usize) { - // only increment target_index - // no need to set any null bits because target_buffer is initialized with all 1s - self.target_index += count; + fn copy_null_bits(&mut self, source_index: usize, count: usize) { + for i in 0..count { + self.copy_null_bit(source_index + i); + } } fn null_count(&self) -> usize { @@ -164,9 +164,9 @@ fn filter_array_impl( continue; } else if filter_batch == all_ones_batch { // if batch == all 1s: copy all 64 values in one go - null_bit_setter.advance(64); let data_index = i * 64; let data_len = value_size * 64; + null_bit_setter.copy_null_bits(data_index, 64); unsafe { // this should be safe because of the data_array.len() check at the beginning of the method memory::memcpy( @@ -606,7 +606,7 @@ mod tests { #[test] fn test_filter_array_high_density() { // this test exercises the all 1's branch of the filter algorithm - let mut data_values = (1..=65).into_iter().collect::>(); + let mut data_values = (1..=65).into_iter().map(|x| Some(x)).collect::>(); let mut filter_values = (1..=65) .into_iter() .map(|i| match i % 65 { @@ -614,17 +614,22 @@ mod tests { _ => true, }) .collect::>(); + // set second data value to null + data_values[1] = None; // set up two more values after the batch - data_values.extend_from_slice(&[66, 67]); - filter_values.extend_from_slice(&[false, true]); + data_values.extend_from_slice(&[Some(66), None, Some(67), None]); + filter_values.extend_from_slice(&[false, true, true, true]); let a = Int32Array::from(data_values); let b = BooleanArray::from(filter_values); let c = filter(&a, &b).unwrap(); let d = c.as_ref().as_any().downcast_ref::().unwrap(); - assert_eq!(65, d.len()); + assert_eq!(67, d.len()); + assert_eq!(3, d.null_count()); assert_eq!(1, d.value(0)); + assert_eq!(true, d.is_null(1)); assert_eq!(64, d.value(63)); - assert_eq!(67, d.value(64)); + assert_eq!(true, d.is_null(64)); + assert_eq!(67, d.value(65)); } #[test] From 6073fb79cc85bcae348552706baf4c90ab0cb934 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Mon, 27 Jul 2020 21:21:23 +0100 Subject: [PATCH 10/11] replace unsafe memory::memcpy with copy_from_slice --- rust/arrow/src/compute/kernels/filter.rs | 42 ++++++++++-------------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index f2632b3f386..ab5e8e66853 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -24,7 +24,6 @@ use crate::record_batch::RecordBatch; use crate::{ bitmap::Bitmap, buffer::{Buffer, MutableBuffer}, - memory, util::bit_util, }; use std::{mem, sync::Arc}; @@ -148,10 +147,11 @@ fn filter_array_impl( let filtered_count = filter_context.filtered_count; let filter_mask = &filter_context.filter_mask; let filter_u64 = &filter_context.filter_u64; - let data_start = data_array.data_ref().buffers()[0].raw_data(); - let mut value_buffer = MutableBuffer::new(filtered_count * value_size); - value_buffer.resize(filtered_count * value_size)?; - let mut value_position = value_buffer.raw_data_mut(); + let data_bytes = data_array.data_ref().buffers()[0].data(); + let mut target_buffer = MutableBuffer::new(filtered_count * value_size); + target_buffer.resize(filtered_count * value_size)?; + let target_bytes = target_buffer.data_mut(); + let mut target_byte_index: usize = 0; let mut null_bit_setter = get_null_bit_setter(data_array); let null_bit_setter = null_bit_setter.as_mut(); let all_ones_batch = !0u64; @@ -165,17 +165,14 @@ fn filter_array_impl( } else if filter_batch == all_ones_batch { // if batch == all 1s: copy all 64 values in one go let data_index = i * 64; - let data_len = value_size * 64; null_bit_setter.copy_null_bits(data_index, 64); - unsafe { - // this should be safe because of the data_array.len() check at the beginning of the method - memory::memcpy( - value_position, - data_start.add(value_size * data_index), - data_len, + let data_byte_index = data_index * value_size; + let data_len = value_size * 64; + target_bytes[target_byte_index..(target_byte_index + data_len)] + .copy_from_slice( + &data_bytes[data_byte_index..(data_byte_index + data_len)], ); - value_position = value_position.add(data_len); - } + target_byte_index += data_len; continue; } for (j, filter_mask) in filter_mask.iter().enumerate() { @@ -183,23 +180,20 @@ fn filter_array_impl( if (filter_batch & *filter_mask) != 0 { let data_index = (i * 64) + j; null_bit_setter.copy_null_bit(data_index); - // if filter bit == 1: copy data value to temp array - unsafe { - // this should be safe because of the data_array.len() check at the beginning of the method - memory::memcpy( - value_position, - data_start.add(value_size * data_index), - value_size, + // if filter bit == 1: copy data value bytes + let data_byte_index = data_index * value_size; + target_bytes[target_byte_index..(target_byte_index + value_size)] + .copy_from_slice( + &data_bytes[data_byte_index..(data_byte_index + value_size)], ); - value_position = value_position.add(value_size); - } + target_byte_index += value_size; } } } let mut array_data_builder = ArrayDataBuilder::new(array_type) .len(filtered_count) - .add_buffer(value_buffer.freeze()); + .add_buffer(target_buffer.freeze()); if null_bit_setter.null_count() > 0 { array_data_builder = array_data_builder .null_count(null_bit_setter.null_count()) From d2179be80acb4852bc2c164e358c1751a1769656 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Fri, 31 Jul 2020 21:23:07 +0100 Subject: [PATCH 11/11] implemented support for filtering sliced data arrays --- rust/arrow/benches/filter_kernels.rs | 6 +-- rust/arrow/src/compute/kernels/filter.rs | 59 +++++++++++++++++++----- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/rust/arrow/benches/filter_kernels.rs b/rust/arrow/benches/filter_kernels.rs index ffb18b31d5c..75c04352c0a 100644 --- a/rust/arrow/benches/filter_kernels.rs +++ b/rust/arrow/benches/filter_kernels.rs @@ -94,9 +94,9 @@ fn add_benchmark(c: &mut Criterion) { _ => true, }); - let filter_context = FilterContext::new(&filter_array); - let sparse_filter_context = FilterContext::new(&sparse_filter_array); - let dense_filter_context = FilterContext::new(&dense_filter_array); + let filter_context = FilterContext::new(&filter_array).unwrap(); + let sparse_filter_context = FilterContext::new(&sparse_filter_array).unwrap(); + let dense_filter_context = FilterContext::new(&dense_filter_array).unwrap(); let data_array = create_primitive_array(size, |i| match i % 2 { 0 => 1, diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index ab5e8e66853..98d70f05ced 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -155,6 +155,7 @@ fn filter_array_impl( let mut null_bit_setter = get_null_bit_setter(data_array); let null_bit_setter = null_bit_setter.as_mut(); let all_ones_batch = !0u64; + let data_array_offset = data_array.offset(); for (i, filter_batch) in filter_u64.iter().enumerate() { // foreach u64 batch @@ -164,7 +165,7 @@ fn filter_array_impl( continue; } else if filter_batch == all_ones_batch { // if batch == all 1s: copy all 64 values in one go - let data_index = i * 64; + let data_index = (i * 64) + data_array_offset; null_bit_setter.copy_null_bits(data_index, 64); let data_byte_index = data_index * value_size; let data_len = value_size * 64; @@ -178,7 +179,7 @@ fn filter_array_impl( for (j, filter_mask) in filter_mask.iter().enumerate() { // foreach bit in batch: if (filter_batch & *filter_mask) != 0 { - let data_index = (i * 64) + j; + let data_index = (i * 64) + j + data_array_offset; null_bit_setter.copy_null_bit(data_index); // if filter bit == 1: copy data value bytes let data_byte_index = data_index * value_size; @@ -231,22 +232,26 @@ macro_rules! filter_dictionary_array { impl FilterContext { /// Returns a new instance of FilterContext - pub fn new(filter_array: &BooleanArray) -> Self { + pub fn new(filter_array: &BooleanArray) -> Result { + if filter_array.offset() > 0 { + return Err(ArrowError::ComputeError( + "Filter array cannot have offset > 0".to_string(), + )); + } let filter_mask: Vec = (0..64).map(|x| 1u64 << x).collect(); let filter_bytes = filter_array.data_ref().buffers()[0].data(); let filtered_count = bit_util::count_set_bits(filter_bytes); // transmute filter_bytes to &[u64] let mut u64_buffer = MutableBuffer::new(filter_bytes.len()); u64_buffer - .write_bytes(filter_bytes, u64_buffer.capacity() - filter_bytes.len()) - .unwrap(); + .write_bytes(filter_bytes, u64_buffer.capacity() - filter_bytes.len())?; let filter_u64 = u64_buffer.typed_data_mut::().to_owned(); - FilterContext { + Ok(FilterContext { filter_u64, filter_len: filter_array.len(), filtered_count, filter_mask, - } + }) } /// Returns a new array, containing only the elements matching the filter @@ -431,7 +436,7 @@ impl FilterContext { /// Returns a new array, containing only the elements matching the filter. pub fn filter(array: &Array, filter: &BooleanArray) -> Result { - FilterContext::new(filter).filter(array) + FilterContext::new(filter)?.filter(array) } /// Returns a new PrimitiveArray containing only those values from the array passed as the data_array parameter, @@ -443,7 +448,7 @@ pub fn filter_primitive_array( where T: ArrowNumericType, { - FilterContext::new(filter_array).filter_primitive_array(data_array) + FilterContext::new(filter_array)?.filter_primitive_array(data_array) } /// Returns a new DictionaryArray containing only those keys from the array passed as the data_array parameter, @@ -455,7 +460,7 @@ pub fn filter_dictionary_array( where T: ArrowNumericType, { - FilterContext::new(filter_array).filter_dictionary_array(data_array) + FilterContext::new(filter_array)?.filter_dictionary_array(data_array) } /// Returns a new RecordBatch with arrays containing only values matching the filter. @@ -464,7 +469,7 @@ pub fn filter_record_batch( record_batch: &RecordBatch, filter_array: &BooleanArray, ) -> Result { - let filter_context = FilterContext::new(filter_array); + let filter_context = FilterContext::new(filter_array)?; let filtered_arrays = record_batch .columns() .iter() @@ -574,6 +579,21 @@ mod tests { assert_eq!(8, d.value(1)); } + #[test] + fn test_filter_array_slice() { + let a_slice = Int32Array::from(vec![5, 6, 7, 8, 9]).slice(1, 4); + let a = a_slice.as_ref(); + let b = BooleanArray::from(vec![true, false, false, true]); + // filtering with sliced filter array is not currently supported + // let b_slice = BooleanArray::from(vec![true, false, false, true, false]).slice(1, 4); + // let b = b_slice.as_any().downcast_ref().unwrap(); + let c = filter(a, &b).unwrap(); + let d = c.as_ref().as_any().downcast_ref::().unwrap(); + assert_eq!(2, d.len()); + assert_eq!(6, d.value(0)); + assert_eq!(9, d.value(1)); + } + #[test] fn test_filter_array_low_density() { // this test exercises the all 0's branch of the filter algorithm @@ -647,6 +667,23 @@ mod tests { assert_eq!(true, d.is_null(0)); } + #[test] + fn test_filter_array_slice_with_null() { + let a_slice = + Int32Array::from(vec![Some(5), None, Some(7), Some(8), Some(9)]).slice(1, 4); + let a = a_slice.as_ref(); + let b = BooleanArray::from(vec![true, false, false, true]); + // filtering with sliced filter array is not currently supported + // let b_slice = BooleanArray::from(vec![true, false, false, true, false]).slice(1, 4); + // let b = b_slice.as_any().downcast_ref().unwrap(); + let c = filter(a, &b).unwrap(); + let d = c.as_ref().as_any().downcast_ref::().unwrap(); + assert_eq!(2, d.len()); + assert_eq!(true, d.is_null(0)); + assert_eq!(false, d.is_null(1)); + assert_eq!(9, d.value(1)); + } + #[test] fn test_filter_dictionary_array() { let values = vec![Some("hello"), None, Some("world"), Some("!")];