From 290607d4d9766b6b8aec6690fede1154a23ca182 Mon Sep 17 00:00:00 2001 From: Andre Braga Reis Date: Tue, 9 Feb 2021 01:47:12 +0000 Subject: [PATCH 1/7] Implement a SISD divide_scalar op --- rust/arrow/src/compute/kernels/arithmetic.rs | 63 ++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 067756662cf..009486f2db7 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -256,6 +256,49 @@ where Ok(PrimitiveArray::::from(Arc::new(data))) } +/// Scalar-divisor version of `math_divide`. +fn math_divide_scalar( + array: &PrimitiveArray, + divisor: T::Native, +) -> Result> +where + T: ArrowNumericType, + T::Native: Div + Zero, +{ + if divisor.is_zero() { + return Err(ArrowError::DivideByZero); + } + + let null_bit_buffer = array.data_ref().null_buffer().cloned(); + + let buffer = if let Some(b) = &null_bit_buffer { + let values = array.values().iter().enumerate().map(|(i, value)| { + let is_valid = unsafe { bit_util::get_bit_raw(b.as_ptr(), i) }; + if is_valid { + *value / divisor + } else { + T::default_value() + } + }); + unsafe { Buffer::from_trusted_len_iter(values) } + } else { + // no value is null + let values = array.values().iter().map(|value| *value / divisor); + unsafe { Buffer::from_trusted_len_iter(values) } + }; + + let data = ArrayData::new( + T::DATA_TYPE, + array.len(), + None, + null_bit_buffer, + 0, + vec![buffer], + vec![], + ); + Ok(PrimitiveArray::::from(Arc::new(data))) +} + /// SIMD vectorized version of `math_op` above. #[cfg(simd)] fn simd_math_op( @@ -622,6 +665,26 @@ where return math_divide(&left, &right); } +/// Divide every value in an array by a scalar. If any value in the array is null then the +/// result is also null. If the scalar is zero then the result of this operation will be +/// `Err(ArrowError::DivideByZero)`. +pub fn divide_scalar( + array: &PrimitiveArray, + divisor: T::Native, +) -> Result> +where + T: datatypes::ArrowNumericType, + T::Native: Add + + Sub + + Mul + + Div + + Zero + + One, +{ + #[cfg(not(simd))] + return math_divide_scalar(&array, divisor); +} + #[cfg(test)] mod tests { use super::*; From cd16016f787493ca79c22cca96a1d026c20a58db Mon Sep 17 00:00:00 2001 From: Andre Braga Reis Date: Tue, 9 Feb 2021 01:49:38 +0000 Subject: [PATCH 2/7] Implement a SIMD divide_scalar op --- rust/arrow/src/compute/kernels/arithmetic.rs | 133 +++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 009486f2db7..d2a6e04b964 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -430,6 +430,38 @@ where Ok(()) } +/// Scalar-divisor version of `simd_checked_divide_remainder`. +#[cfg(simd)] +#[inline] +fn simd_checked_divide_scalar_remainder( + valid_mask: Option, + array_chunks: ChunksExact, + divisor: T::Native, + result_chunks: ChunksExactMut, +) -> Result<()> +where + T::Native: Zero + Div, +{ + if divisor.is_zero() { + return Err(ArrowError::DivideByZero); + } + + let result_remainder = result_chunks.into_remainder(); + let array_remainder = array_chunks.remainder(); + + result_remainder + .iter_mut() + .zip(array_remainder.iter()) + .enumerate() + .for_each(|(i, (result_scalar, array_scalar))| { + if valid_mask.map(|mask| mask & (1 << i) != 0).unwrap_or(true) { + *result_scalar = *array_scalar / divisor; + } + }); + + Ok(()) +} + /// SIMD vectorized version of `divide`, the divide kernel needs it's own implementation as there /// is a need to handle situations where a divide by `0` occurs. This is complicated by `NULL` /// slots and padding. @@ -549,6 +581,105 @@ where Ok(PrimitiveArray::::from(Arc::new(data))) } +/// SIMD vectorized version of `divide_scalar`. +#[cfg(simd)] +fn simd_divide_scalar( + array: &PrimitiveArray, + divisor: T::Native, +) -> Result> +where + T: ArrowNumericType, + T::Native: One + Zero + Div, +{ + if divisor.is_zero() { + return Err(ArrowError::DivideByZero); + } + + // Get the bitmap of nulls for the array. + let null_bit_buffer = array.data_ref().null_buffer().cloned(); + + let lanes = T::lanes(); + let buffer_size = array.len() * std::mem::size_of::(); + let mut result = MutableBuffer::new(buffer_size).with_bitset(buffer_size, false); + + match &null_bit_buffer { + Some(b) => { + // combine_option_bitmap returns a slice or new buffer starting at 0 + let valid_chunks = b.bit_chunks(0, array.len()); + + // process data in chunks of 64 elements since we also get 64 bits of validity information at a time + let mut result_chunks = result.typed_data_mut().chunks_exact_mut(64); + let mut array_chunks = array.values().chunks_exact(64); + + valid_chunks + .iter() + .zip(result_chunks.borrow_mut().zip(array_chunks.borrow_mut())) + .for_each(|(mut mask, (result_slice, array_slice))| { + // split chunks further into slices corresponding to the vector length + // the compiler is able to unroll this inner loop and remove bounds checks + // since the outer chunk size (64) is always a multiple of the number of lanes + result_slice + .chunks_exact_mut(lanes) + .zip(array_slice.chunks_exact(lanes)) + .for_each(|(result_slice, array_slice)| { + let simd_left = T::load(array_slice); + let simd_right = T::init(divisor); + + // We know `divisor` is non-zero, so we can skip `simd_checked_divide()` + // and just do the op. + let simd_result = + T::bin_op(simd_left, simd_right, |a, b| a / b); + T::write(simd_result, result_slice); + + // skip the shift and avoid overflow for u8 type, which uses 64 lanes. + mask >>= T::lanes() % 64; + }) + }); + + let valid_remainder = valid_chunks.remainder_bits(); + simd_checked_divide_scalar_remainder::( + Some(valid_remainder), + array_chunks, + divisor, + result_chunks, + )?; + } + None => { + let mut result_chunks = result.typed_data_mut().chunks_exact_mut(lanes); + let mut array_chunks = array.values().chunks_exact(lanes); + + result_chunks + .borrow_mut() + .zip(array_chunks.borrow_mut()) + .for_each(|(result_slice, array_slice)| { + let simd_left = T::load(array_slice); + let simd_right = T::init(divisor); + + let simd_result = T::bin_op(simd_left, simd_right, |a, b| a / b); + T::write(simd_result, result_slice); + }); + + simd_checked_divide_scalar_remainder::( + None, + array_chunks, + divisor, + result_chunks, + )?; + } + } + + let data = ArrayData::new( + T::DATA_TYPE, + array.len(), + None, + null_bit_buffer, + 0, + vec![result.into()], + vec![], + ); + Ok(PrimitiveArray::::from(Arc::new(data))) +} + /// Perform `left + right` operation on two arrays. If either left or right value is null /// then the result is also null. pub fn add( @@ -681,6 +812,8 @@ where + Zero + One, { + #[cfg(simd)] + return simd_divide_scalar(&array, divisor); #[cfg(not(simd))] return math_divide_scalar(&array, divisor); } From 4013eceeb5060f2cbc962157272dc0088e86d5ff Mon Sep 17 00:00:00 2001 From: Andre Braga Reis Date: Tue, 9 Feb 2021 01:38:25 +0000 Subject: [PATCH 3/7] Add tests and benchmarks --- rust/arrow/benches/arithmetic_kernels.rs | 12 ++++++++++++ rust/arrow/src/compute/kernels/arithmetic.rs | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/rust/arrow/benches/arithmetic_kernels.rs b/rust/arrow/benches/arithmetic_kernels.rs index a1e6ad97664..862866603fc 100644 --- a/rust/arrow/benches/arithmetic_kernels.rs +++ b/rust/arrow/benches/arithmetic_kernels.rs @@ -58,6 +58,11 @@ fn bench_divide(arr_a: &ArrayRef, arr_b: &ArrayRef) { criterion::black_box(divide(&arr_a, &arr_b).unwrap()); } +fn bench_divide_scalar(array: &ArrayRef, divisor: f32) { + let array = array.as_any().downcast_ref::().unwrap(); + criterion::black_box(divide_scalar(&array, divisor).unwrap()); +} + fn bench_limit(arr_a: &ArrayRef, max: usize) { criterion::black_box(limit(arr_a, max)); } @@ -65,6 +70,7 @@ fn bench_limit(arr_a: &ArrayRef, max: usize) { fn add_benchmark(c: &mut Criterion) { let arr_a = create_array(512, false); let arr_b = create_array(512, false); + let scalar = 1.12358; c.bench_function("add 512", |b| b.iter(|| bench_add(&arr_a, &arr_b))); c.bench_function("subtract 512", |b| { @@ -74,6 +80,9 @@ fn add_benchmark(c: &mut Criterion) { b.iter(|| bench_multiply(&arr_a, &arr_b)) }); c.bench_function("divide 512", |b| b.iter(|| bench_divide(&arr_a, &arr_b))); + c.bench_function("divide_scalar 512", |b| { + b.iter(|| bench_divide_scalar(&arr_a, scalar)) + }); c.bench_function("limit 512, 512", |b| b.iter(|| bench_limit(&arr_a, 512))); let arr_a_nulls = create_array(512, false); @@ -84,6 +93,9 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("divide_nulls_512", |b| { b.iter(|| bench_divide(&arr_a_nulls, &arr_b_nulls)) }); + c.bench_function("divide_scalar_nulls_512", |b| { + b.iter(|| bench_divide_scalar(&arr_a_nulls, scalar)) + }); } criterion_group!(benches, add_benchmark); diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index d2a6e04b964..72a82b5dc66 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -905,6 +905,18 @@ mod tests { assert_eq!(9, c.value(4)); } + #[test] + fn test_primitive_array_divide_scalar() { + let a = Int32Array::from(vec![15, 14, 9, 8, 1]); + let b = 3; + let c = divide_scalar(&a, b).unwrap(); + assert_eq!(5, c.value(0)); + assert_eq!(4, c.value(1)); + assert_eq!(3, c.value(2)); + assert_eq!(2, c.value(3)); + assert_eq!(0, c.value(4)); + } + #[test] fn test_primitive_array_divide_sliced() { let a = Int32Array::from(vec![0, 0, 0, 15, 15, 8, 1, 9, 0]); From 7a155908d84eae37fb1cd1657bb8639221c22e3b Mon Sep 17 00:00:00 2001 From: Andre Braga Reis Date: Thu, 11 Feb 2021 20:30:18 +0000 Subject: [PATCH 4/7] Generate a random scalar in benchmarks --- rust/arrow/benches/arithmetic_kernels.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/arrow/benches/arithmetic_kernels.rs b/rust/arrow/benches/arithmetic_kernels.rs index 862866603fc..721157e2846 100644 --- a/rust/arrow/benches/arithmetic_kernels.rs +++ b/rust/arrow/benches/arithmetic_kernels.rs @@ -18,15 +18,16 @@ #[macro_use] extern crate criterion; use criterion::Criterion; +use rand::Rng; use std::sync::Arc; extern crate arrow; -use arrow::compute::kernels::arithmetic::*; use arrow::compute::kernels::limit::*; use arrow::util::bench_util::*; use arrow::{array::*, datatypes::Float32Type}; +use arrow::{compute::kernels::arithmetic::*, util::test_util::seedable_rng}; fn create_array(size: usize, with_nulls: bool) -> ArrayRef { let null_density = if with_nulls { 0.5 } else { 0.0 }; @@ -70,7 +71,7 @@ fn bench_limit(arr_a: &ArrayRef, max: usize) { fn add_benchmark(c: &mut Criterion) { let arr_a = create_array(512, false); let arr_b = create_array(512, false); - let scalar = 1.12358; + let scalar = seedable_rng().gen(); c.bench_function("add 512", |b| b.iter(|| bench_add(&arr_a, &arr_b))); c.bench_function("subtract 512", |b| { From 3a157bfd904e424a1b623d9c1fa7ec7f55463f2e Mon Sep 17 00:00:00 2001 From: Andre Braga Reis Date: Tue, 9 Feb 2021 01:50:30 +0000 Subject: [PATCH 5/7] Improve a couple doc comments --- rust/arrow/src/buffer/immutable.rs | 2 +- rust/arrow/src/compute/kernels/arithmetic.rs | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/rust/arrow/src/buffer/immutable.rs b/rust/arrow/src/buffer/immutable.rs index df5690c06bf..e96bc003c8b 100644 --- a/rust/arrow/src/buffer/immutable.rs +++ b/rust/arrow/src/buffer/immutable.rs @@ -293,7 +293,7 @@ impl Buffer { /// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length or errors /// if any of the items of the iterator is an error. - /// Prefer this to `collect` whenever possible, as it is faster ~60% faster. + /// Prefer this to `collect` whenever possible, as it is ~60% faster. /// # Safety /// This method assumes that the iterator's size is correct and is undefined behavior /// to use it on an iterator that reports an incorrect length. diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 72a82b5dc66..76d614e8b53 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -462,9 +462,10 @@ where Ok(()) } -/// SIMD vectorized version of `divide`, the divide kernel needs it's own implementation as there -/// is a need to handle situations where a divide by `0` occurs. This is complicated by `NULL` -/// slots and padding. +/// SIMD vectorized version of `divide`. +/// +/// The divide kernels need their own implementation as there is a need to handle situations +/// where a divide by `0` occurs. This is complicated by `NULL` slots and padding. #[cfg(simd)] fn simd_divide( left: &PrimitiveArray, From f3a73070715b9808b86c887321df90338b022834 Mon Sep 17 00:00:00 2001 From: Andre Braga Reis Date: Sat, 20 Feb 2021 22:00:40 +0000 Subject: [PATCH 6/7] Skip a few more null checks --- rust/arrow/src/compute/kernels/arithmetic.rs | 108 ++++--------------- 1 file changed, 18 insertions(+), 90 deletions(-) diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 76d614e8b53..75afe6d5c52 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -269,29 +269,14 @@ where return Err(ArrowError::DivideByZero); } - let null_bit_buffer = array.data_ref().null_buffer().cloned(); - - let buffer = if let Some(b) = &null_bit_buffer { - let values = array.values().iter().enumerate().map(|(i, value)| { - let is_valid = unsafe { bit_util::get_bit_raw(b.as_ptr(), i) }; - if is_valid { - *value / divisor - } else { - T::default_value() - } - }); - unsafe { Buffer::from_trusted_len_iter(values) } - } else { - // no value is null - let values = array.values().iter().map(|value| *value / divisor); - unsafe { Buffer::from_trusted_len_iter(values) } - }; + let values = array.values().iter().map(|value| *value / divisor); + let buffer = unsafe { Buffer::from_trusted_len_iter(values) }; let data = ArrayData::new( T::DATA_TYPE, array.len(), None, - null_bit_buffer, + array.data_ref().null_buffer().cloned(), 0, vec![buffer], vec![], @@ -434,7 +419,6 @@ where #[cfg(simd)] #[inline] fn simd_checked_divide_scalar_remainder( - valid_mask: Option, array_chunks: ChunksExact, divisor: T::Native, result_chunks: ChunksExactMut, @@ -452,11 +436,8 @@ where result_remainder .iter_mut() .zip(array_remainder.iter()) - .enumerate() - .for_each(|(i, (result_scalar, array_scalar))| { - if valid_mask.map(|mask| mask & (1 << i) != 0).unwrap_or(true) { - *result_scalar = *array_scalar / divisor; - } + .for_each(|(result_scalar, array_scalar)| { + *result_scalar = *array_scalar / divisor; }); Ok(()) @@ -596,84 +577,31 @@ where return Err(ArrowError::DivideByZero); } - // Get the bitmap of nulls for the array. - let null_bit_buffer = array.data_ref().null_buffer().cloned(); - let lanes = T::lanes(); let buffer_size = array.len() * std::mem::size_of::(); let mut result = MutableBuffer::new(buffer_size).with_bitset(buffer_size, false); - match &null_bit_buffer { - Some(b) => { - // combine_option_bitmap returns a slice or new buffer starting at 0 - let valid_chunks = b.bit_chunks(0, array.len()); - - // process data in chunks of 64 elements since we also get 64 bits of validity information at a time - let mut result_chunks = result.typed_data_mut().chunks_exact_mut(64); - let mut array_chunks = array.values().chunks_exact(64); - - valid_chunks - .iter() - .zip(result_chunks.borrow_mut().zip(array_chunks.borrow_mut())) - .for_each(|(mut mask, (result_slice, array_slice))| { - // split chunks further into slices corresponding to the vector length - // the compiler is able to unroll this inner loop and remove bounds checks - // since the outer chunk size (64) is always a multiple of the number of lanes - result_slice - .chunks_exact_mut(lanes) - .zip(array_slice.chunks_exact(lanes)) - .for_each(|(result_slice, array_slice)| { - let simd_left = T::load(array_slice); - let simd_right = T::init(divisor); - - // We know `divisor` is non-zero, so we can skip `simd_checked_divide()` - // and just do the op. - let simd_result = - T::bin_op(simd_left, simd_right, |a, b| a / b); - T::write(simd_result, result_slice); - - // skip the shift and avoid overflow for u8 type, which uses 64 lanes. - mask >>= T::lanes() % 64; - }) - }); - - let valid_remainder = valid_chunks.remainder_bits(); - simd_checked_divide_scalar_remainder::( - Some(valid_remainder), - array_chunks, - divisor, - result_chunks, - )?; - } - None => { - let mut result_chunks = result.typed_data_mut().chunks_exact_mut(lanes); - let mut array_chunks = array.values().chunks_exact(lanes); + let mut result_chunks = result.typed_data_mut().chunks_exact_mut(lanes); + let mut array_chunks = array.values().chunks_exact(lanes); - result_chunks - .borrow_mut() - .zip(array_chunks.borrow_mut()) - .for_each(|(result_slice, array_slice)| { - let simd_left = T::load(array_slice); - let simd_right = T::init(divisor); + result_chunks + .borrow_mut() + .zip(array_chunks.borrow_mut()) + .for_each(|(result_slice, array_slice)| { + let simd_left = T::load(array_slice); + let simd_right = T::init(divisor); - let simd_result = T::bin_op(simd_left, simd_right, |a, b| a / b); - T::write(simd_result, result_slice); - }); + let simd_result = T::bin_op(simd_left, simd_right, |a, b| a / b); + T::write(simd_result, result_slice); + }); - simd_checked_divide_scalar_remainder::( - None, - array_chunks, - divisor, - result_chunks, - )?; - } - } + simd_checked_divide_scalar_remainder::(array_chunks, divisor, result_chunks)?; let data = ArrayData::new( T::DATA_TYPE, array.len(), None, - null_bit_buffer, + array.data_ref().null_buffer().cloned(), 0, vec![result.into()], vec![], From 6b5202605e1582d0139037ac5d323940fe5c8176 Mon Sep 17 00:00:00 2001 From: Andre Braga Reis Date: Sat, 20 Feb 2021 22:00:51 +0000 Subject: [PATCH 7/7] Expand test coverage --- rust/arrow/src/compute/kernels/arithmetic.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 75afe6d5c52..a40e5ea4308 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -839,11 +839,8 @@ mod tests { let a = Int32Array::from(vec![15, 14, 9, 8, 1]); let b = 3; let c = divide_scalar(&a, b).unwrap(); - assert_eq!(5, c.value(0)); - assert_eq!(4, c.value(1)); - assert_eq!(3, c.value(2)); - assert_eq!(2, c.value(3)); - assert_eq!(0, c.value(4)); + let expected = Int32Array::from(vec![5, 4, 3, 2, 0]); + assert_eq!(c, expected); } #[test] @@ -877,6 +874,16 @@ mod tests { assert_eq!(true, c.is_null(5)); } + #[test] + fn test_primitive_array_divide_scalar_with_nulls() { + let a = Int32Array::from(vec![Some(15), None, Some(8), Some(1), Some(9), None]); + let b = 3; + let c = divide_scalar(&a, b).unwrap(); + let expected = + Int32Array::from(vec![Some(5), None, Some(2), Some(0), Some(3), None]); + assert_eq!(c, expected); + } + #[test] fn test_primitive_array_divide_with_nulls_sliced() { let a = Int32Array::from(vec![