Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ criterion = "0.3"
flate2 = "1"
tempfile = "3"

[[bench]]
name = "aggregate_kernels"
harness = false

[[bench]]
name = "array_from_vec"
harness = false
Expand Down
68 changes: 68 additions & 0 deletions rust/arrow/benches/aggregate_kernels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
use criterion::Criterion;

use rand::Rng;
use std::sync::Arc;

extern crate arrow;

use arrow::array::*;
use arrow::compute::kernels::aggregate::*;

fn create_array(size: usize, with_nulls: bool) -> ArrayRef {
// use random numbers to avoid spurious compiler optimizations wrt to branching
let mut rng = rand::thread_rng();
let mut builder = Float32Builder::new(size);

for _ in 0..size {
if with_nulls && rng.gen::<f32>() > 0.5 {
builder.append_null().unwrap();
} else {
builder.append_value(rng.gen()).unwrap();
}
}
Arc::new(builder.finish())
}

fn bench_sum(arr_a: &ArrayRef) {
let arr_a = arr_a.as_any().downcast_ref::<Float32Array>().unwrap();
criterion::black_box(sum(&arr_a).unwrap());
}

fn bench_min(arr_a: &ArrayRef) {
let arr_a = arr_a.as_any().downcast_ref::<Float32Array>().unwrap();
criterion::black_box(min(&arr_a).unwrap());
}

fn add_benchmark(c: &mut Criterion) {
let arr_a = create_array(512, false);

c.bench_function("sum 512", |b| b.iter(|| bench_sum(&arr_a)));
c.bench_function("min 512", |b| b.iter(|| bench_min(&arr_a)));

let arr_a = create_array(512, true);

c.bench_function("sum nulls 512", |b| b.iter(|| bench_sum(&arr_a)));
c.bench_function("min nulls 512", |b| b.iter(|| bench_min(&arr_a)));
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
63 changes: 31 additions & 32 deletions rust/arrow/benches/arithmetic_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,59 +24,58 @@ use std::sync::Arc;
extern crate arrow;

use arrow::array::*;
use arrow::compute::kernels::aggregate::*;
use arrow::compute::kernels::arithmetic::*;
use arrow::compute::kernels::limit::*;

fn create_array(size: usize) -> Float32Array {
fn create_array(size: usize) -> ArrayRef {
let mut builder = Float32Builder::new(size);
for _i in 0..size {
builder.append_value(1.0).unwrap();
for i in 0..size {
builder.append_value(1.0 + 1.0 * i as f32).unwrap();
}
builder.finish()
Arc::new(builder.finish())
}

fn bench_add(size: usize) {
let arr_a = create_array(size);
let arr_b = create_array(size);
criterion::black_box(add(&arr_a, &arr_b).unwrap());
fn bench_add(arr_a: &ArrayRef, arr_b: &ArrayRef) {
let arr_a = arr_a.as_any().downcast_ref::<Float32Array>().unwrap();
let arr_b = arr_b.as_any().downcast_ref::<Float32Array>().unwrap();
criterion::black_box(add(arr_a, arr_b).unwrap());
}

fn bench_subtract(size: usize) {
let arr_a = create_array(size);
let arr_b = create_array(size);
fn bench_subtract(arr_a: &ArrayRef, arr_b: &ArrayRef) {
let arr_a = arr_a.as_any().downcast_ref::<Float32Array>().unwrap();
let arr_b = arr_b.as_any().downcast_ref::<Float32Array>().unwrap();
criterion::black_box(subtract(&arr_a, &arr_b).unwrap());
}

fn bench_multiply(size: usize) {
let arr_a = create_array(size);
let arr_b = create_array(size);
fn bench_multiply(arr_a: &ArrayRef, arr_b: &ArrayRef) {
let arr_a = arr_a.as_any().downcast_ref::<Float32Array>().unwrap();
let arr_b = arr_b.as_any().downcast_ref::<Float32Array>().unwrap();
criterion::black_box(multiply(&arr_a, &arr_b).unwrap());
}

fn bench_divide(size: usize) {
let arr_a = create_array(size);
let arr_b = create_array(size);
fn bench_divide(arr_a: &ArrayRef, arr_b: &ArrayRef) {
let arr_a = arr_a.as_any().downcast_ref::<Float32Array>().unwrap();
let arr_b = arr_b.as_any().downcast_ref::<Float32Array>().unwrap();
criterion::black_box(divide(&arr_a, &arr_b).unwrap());
}

fn bench_sum(size: usize) {
let arr_a = create_array(size);
criterion::black_box(sum(&arr_a).unwrap());
}

fn bench_limit(size: usize, max: usize) {
let arr_a: ArrayRef = Arc::new(create_array(size));
criterion::black_box(limit(&arr_a, max).unwrap());
fn bench_limit(arr_a: &ArrayRef, max: usize) {
criterion::black_box(limit(arr_a, max).unwrap());
}

fn add_benchmark(c: &mut Criterion) {
c.bench_function("add 512", |b| b.iter(|| bench_add(512)));
c.bench_function("subtract 512", |b| b.iter(|| bench_subtract(512)));
c.bench_function("multiply 512", |b| b.iter(|| bench_multiply(512)));
c.bench_function("divide 512", |b| b.iter(|| bench_divide(512)));
c.bench_function("sum 512", |b| b.iter(|| bench_sum(512)));
c.bench_function("limit 512, 512", |b| b.iter(|| bench_limit(512, 512)));
let arr_a = create_array(512);
let arr_b = create_array(512);

c.bench_function("add 512", |b| b.iter(|| bench_add(&arr_a, &arr_b)));
c.bench_function("subtract 512", |b| {
b.iter(|| bench_subtract(&arr_a, &arr_b))
});
c.bench_function("multiply 512", |b| {
b.iter(|| bench_multiply(&arr_a, &arr_b))
});
c.bench_function("divide 512", |b| b.iter(|| bench_divide(&arr_a, &arr_b)));
c.bench_function("limit 512, 512", |b| b.iter(|| bench_limit(&arr_a, 512)));
}

criterion_group!(benches, add_benchmark);
Expand Down
60 changes: 35 additions & 25 deletions rust/arrow/src/compute/kernels/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,51 @@ pub fn min<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
where
T: ArrowNumericType,
{
min_max_helper(array, |a, b| a < b)
min_max_helper(array, |a, b| a > b)
}

/// Returns the maximum value in the array, according to the natural order.
pub fn max<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
where
T: ArrowNumericType,
{
min_max_helper(array, |a, b| a > b)
min_max_helper(array, |a, b| a < b)
Comment thread
jorgecarleitao marked this conversation as resolved.
}

/// Helper function to perform min/max lambda function on values from a numeric array.
fn min_max_helper<T, F>(array: &PrimitiveArray<T>, cmp: F) -> Option<T::Native>
where
T: ArrowNumericType,
F: Fn(T::Native, T::Native) -> bool,
F: Fn(&T::Native, &T::Native) -> bool,
{
let mut n: Option<T::Native> = None;
let null_count = array.null_count();

if null_count == array.len() {
return None;
}

let mut n: T::Native = T::default_value();
let mut has_value = false;
let data = array.data();
for i in 0..data.len() {
if data.is_null(i) {
continue;
let m = array.value_slice(0, data.len());

if null_count == 0 {
// optimized path for arrays without null values
for item in m {
if !has_value || cmp(&n, item) {
has_value = true;
n = *item
}
}
let m = array.value(i);
match n {
None => n = Some(m),
Some(nn) => {
if cmp(m, nn) {
n = Some(m)
}
} else {
for (i, item) in m.iter().enumerate() {
if !has_value || data.is_valid(i) && cmp(&n, item) {
has_value = true;
n = *item
}
}
}
n
Some(n)
}

/// Returns the sum of values in the array.
Expand All @@ -74,27 +85,26 @@ where
let null_count = array.null_count();

if null_count == array.len() {
None
} else if null_count == 0 {
return None;
}

let mut n: T::Native = T::default_value();
let data = array.data();
let m = array.value_slice(0, data.len());

if null_count == 0 {
// optimized path for arrays without null values
let mut n: T::Native = T::default_value();
let data = array.data();
let m = array.value_slice(0, data.len());
for item in m.iter().take(data.len()) {
n = n + *item;
}
Some(n)
} else {
let mut n: T::Native = T::default_value();
let data = array.data();
let m = array.value_slice(0, data.len());
for (i, item) in m.iter().enumerate() {
if data.is_valid(i) {
n = n + *item;
}
}
Some(n)
}
Some(n)
}

#[cfg(test)]
Expand Down