Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion rust/arrow/benches/arithmetic_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
#[macro_use]
extern crate criterion;
use criterion::Criterion;
use rand::Rng;

use std::sync::Arc;

extern crate arrow;

use arrow::compute::kernels::arithmetic::*;
use arrow::compute::kernels::limit::*;
use arrow::util::bench_util::*;
use arrow::{array::*, datatypes::Float32Type};
use arrow::{compute::kernels::arithmetic::*, util::test_util::seedable_rng};

fn create_array(size: usize, with_nulls: bool) -> ArrayRef {
let null_density = if with_nulls { 0.5 } else { 0.0 };
Expand Down Expand Up @@ -58,13 +59,19 @@ fn bench_divide(arr_a: &ArrayRef, arr_b: &ArrayRef) {
criterion::black_box(divide(&arr_a, &arr_b).unwrap());
}

fn bench_divide_scalar(array: &ArrayRef, divisor: f32) {
let array = array.as_any().downcast_ref::<Float32Array>().unwrap();
criterion::black_box(divide_scalar(&array, divisor).unwrap());
}

fn bench_limit(arr_a: &ArrayRef, max: usize) {
criterion::black_box(limit(arr_a, max));
}

fn add_benchmark(c: &mut Criterion) {
let arr_a = create_array(512, false);
let arr_b = create_array(512, false);
let scalar = seedable_rng().gen();

c.bench_function("add 512", |b| b.iter(|| bench_add(&arr_a, &arr_b)));
c.bench_function("subtract 512", |b| {
Expand All @@ -74,6 +81,9 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_multiply(&arr_a, &arr_b))
});
c.bench_function("divide 512", |b| b.iter(|| bench_divide(&arr_a, &arr_b)));
c.bench_function("divide_scalar 512", |b| {
b.iter(|| bench_divide_scalar(&arr_a, scalar))
});
c.bench_function("limit 512, 512", |b| b.iter(|| bench_limit(&arr_a, 512)));

let arr_a_nulls = create_array(512, false);
Expand All @@ -84,6 +94,9 @@ fn add_benchmark(c: &mut Criterion) {
c.bench_function("divide_nulls_512", |b| {
b.iter(|| bench_divide(&arr_a_nulls, &arr_b_nulls))
});
c.bench_function("divide_scalar_nulls_512", |b| {
b.iter(|| bench_divide_scalar(&arr_a_nulls, scalar))
});
}

criterion_group!(benches, add_benchmark);
Expand Down
2 changes: 1 addition & 1 deletion rust/arrow/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl Buffer {

/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length or errors
/// if any of the items of the iterator is an error.
/// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
/// Prefer this to `collect` whenever possible, as it is ~60% faster.
/// # Safety
/// This method assumes that the iterator's size is correct and is undefined behavior
/// to use it on an iterator that reports an incorrect length.
Expand Down
150 changes: 147 additions & 3 deletions rust/arrow/src/compute/kernels/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,34 @@ where
Ok(PrimitiveArray::<T>::from(Arc::new(data)))
}

/// Scalar-divisor version of `math_divide`.
fn math_divide_scalar<T>(
array: &PrimitiveArray<T>,
divisor: T::Native,
) -> Result<PrimitiveArray<T>>
where
T: ArrowNumericType,
T::Native: Div<Output = T::Native> + Zero,
{
if divisor.is_zero() {
return Err(ArrowError::DivideByZero);
}

let values = array.values().iter().map(|value| *value / divisor);
let buffer = unsafe { Buffer::from_trusted_len_iter(values) };

let data = ArrayData::new(
T::DATA_TYPE,
array.len(),
None,
array.data_ref().null_buffer().cloned(),
0,
vec![buffer],
vec![],
);
Ok(PrimitiveArray::<T>::from(Arc::new(data)))
}

/// SIMD vectorized version of `math_op` above.
#[cfg(simd)]
fn simd_math_op<T, SIMD_OP, SCALAR_OP>(
Expand Down Expand Up @@ -387,9 +415,38 @@ where
Ok(())
}

/// SIMD vectorized version of `divide`, the divide kernel needs it's own implementation as there
/// is a need to handle situations where a divide by `0` occurs. This is complicated by `NULL`
/// slots and padding.
/// Scalar-divisor version of `simd_checked_divide_remainder`.
#[cfg(simd)]
#[inline]
fn simd_checked_divide_scalar_remainder<T: ArrowNumericType>(
array_chunks: ChunksExact<T::Native>,
divisor: T::Native,
result_chunks: ChunksExactMut<T::Native>,
) -> Result<()>
where
T::Native: Zero + Div<Output = T::Native>,
{
if divisor.is_zero() {
return Err(ArrowError::DivideByZero);
}

let result_remainder = result_chunks.into_remainder();
let array_remainder = array_chunks.remainder();

result_remainder
.iter_mut()
.zip(array_remainder.iter())
.for_each(|(result_scalar, array_scalar)| {
*result_scalar = *array_scalar / divisor;
});

Ok(())
}

/// SIMD vectorized version of `divide`.
///
/// The divide kernels need their own implementation as there is a need to handle situations
/// where a divide by `0` occurs. This is complicated by `NULL` slots and padding.
#[cfg(simd)]
fn simd_divide<T>(
left: &PrimitiveArray<T>,
Expand Down Expand Up @@ -506,6 +563,52 @@ where
Ok(PrimitiveArray::<T>::from(Arc::new(data)))
}

/// SIMD vectorized version of `divide_scalar`.
#[cfg(simd)]
fn simd_divide_scalar<T>(
array: &PrimitiveArray<T>,
divisor: T::Native,
) -> Result<PrimitiveArray<T>>
where
T: ArrowNumericType,
T::Native: One + Zero + Div<Output = T::Native>,
{
if divisor.is_zero() {
return Err(ArrowError::DivideByZero);
}

let lanes = T::lanes();
let buffer_size = array.len() * std::mem::size_of::<T::Native>();
let mut result = MutableBuffer::new(buffer_size).with_bitset(buffer_size, false);

let mut result_chunks = result.typed_data_mut().chunks_exact_mut(lanes);
let mut array_chunks = array.values().chunks_exact(lanes);

result_chunks
.borrow_mut()
.zip(array_chunks.borrow_mut())
.for_each(|(result_slice, array_slice)| {
let simd_left = T::load(array_slice);
let simd_right = T::init(divisor);

let simd_result = T::bin_op(simd_left, simd_right, |a, b| a / b);
T::write(simd_result, result_slice);
});

simd_checked_divide_scalar_remainder::<T>(array_chunks, divisor, result_chunks)?;

let data = ArrayData::new(
T::DATA_TYPE,
array.len(),
None,
array.data_ref().null_buffer().cloned(),
0,
vec![result.into()],
vec![],
);
Ok(PrimitiveArray::<T>::from(Arc::new(data)))
}

/// Perform `left + right` operation on two arrays. If either left or right value is null
/// then the result is also null.
pub fn add<T>(
Expand Down Expand Up @@ -622,6 +725,28 @@ where
return math_divide(&left, &right);
}

/// Divide every value in an array by a scalar. If any value in the array is null then the
/// result is also null. If the scalar is zero then the result of this operation will be
/// `Err(ArrowError::DivideByZero)`.
pub fn divide_scalar<T>(
array: &PrimitiveArray<T>,
divisor: T::Native,
) -> Result<PrimitiveArray<T>>
where
T: datatypes::ArrowNumericType,
T::Native: Add<Output = T::Native>
+ Sub<Output = T::Native>
+ Mul<Output = T::Native>
+ Div<Output = T::Native>
+ Zero
+ One,
{
#[cfg(simd)]
return simd_divide_scalar(&array, divisor);
#[cfg(not(simd))]
return math_divide_scalar(&array, divisor);
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -709,6 +834,15 @@ mod tests {
assert_eq!(9, c.value(4));
}

#[test]
fn test_primitive_array_divide_scalar() {
let a = Int32Array::from(vec![15, 14, 9, 8, 1]);
let b = 3;
let c = divide_scalar(&a, b).unwrap();
let expected = Int32Array::from(vec![5, 4, 3, 2, 0]);
assert_eq!(c, expected);
}

#[test]
fn test_primitive_array_divide_sliced() {
let a = Int32Array::from(vec![0, 0, 0, 15, 15, 8, 1, 9, 0]);
Expand Down Expand Up @@ -740,6 +874,16 @@ mod tests {
assert_eq!(true, c.is_null(5));
}

#[test]
fn test_primitive_array_divide_scalar_with_nulls() {
let a = Int32Array::from(vec![Some(15), None, Some(8), Some(1), Some(9), None]);
let b = 3;
let c = divide_scalar(&a, b).unwrap();
let expected =
Int32Array::from(vec![Some(5), None, Some(2), Some(0), Some(3), None]);
assert_eq!(c, expected);
}

#[test]
fn test_primitive_array_divide_with_nulls_sliced() {
let a = Int32Array::from(vec![
Expand Down