Skip to content
150 changes: 77 additions & 73 deletions rust/arrow/benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,137 +14,141 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
extern crate arrow;

use arrow::{compute::Filter, util::test_util::seedable_rng};
use rand::{
distributions::{Alphanumeric, Standard},
prelude::Distribution,
Rng,
};

use arrow::array::*;
use arrow::compute::{filter, FilterContext};
use arrow::compute::{build_filter, filter};
use arrow::datatypes::ArrowNumericType;
use arrow::datatypes::{Float32Type, UInt8Type};

use criterion::{criterion_group, criterion_main, Criterion};

fn create_primitive_array<T, F>(size: usize, value_fn: F) -> PrimitiveArray<T>
fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
where
T: ArrowNumericType,
F: Fn(usize) -> T::Native,
Standard: Distribution<T::Native>,
{
// use random numbers to avoid spurious compiler optimizations wrt to branching
let mut rng = seedable_rng();
let mut builder = PrimitiveArray::<T>::builder(size);
for i in 0..size {
builder.append_value(value_fn(i)).unwrap();

for _ in 0..size {
if rng.gen::<f32>() < null_density {
builder.append_null().unwrap();
} else {
builder.append_value(rng.gen()).unwrap();
}
}
builder.finish()
}

fn create_u8_array_with_nulls(size: usize) -> UInt8Array {
let mut builder = UInt8Builder::new(size);
for i in 0..size {
if i % 2 == 0 {
builder.append_value(1).unwrap();
} else {
fn create_string_array(size: usize, null_density: f32) -> StringArray {
// use random numbers to avoid spurious compiler optimizations wrt to branching
let mut rng = seedable_rng();
let mut builder = StringBuilder::new(size);

for _ in 0..size {
if rng.gen::<f32>() < null_density {
builder.append_null().unwrap();
} else {
let value = (&mut rng)
.sample_iter(&Alphanumeric)
.take(10)
.collect::<String>();
builder.append_value(&value).unwrap();
}
}
builder.finish()
}

fn create_bool_array<F>(size: usize, value_fn: F) -> BooleanArray
where
F: Fn(usize) -> bool,
{
fn create_bool_array(size: usize, trues_density: f32) -> BooleanArray {
let mut rng = seedable_rng();
let mut builder = BooleanBuilder::new(size);
for i in 0..size {
builder.append_value(value_fn(i)).unwrap();
for _ in 0..size {
let value = rng.gen::<f32>() < trues_density;
builder.append_value(value).unwrap();
}
builder.finish()
}

fn bench_filter_u8(data_array: &UInt8Array, filter_array: &BooleanArray) {
filter(
criterion::black_box(data_array),
criterion::black_box(filter_array),
)
.unwrap();
}

// fn bench_filter_f32(data_array: &Float32Array, filter_array: &BooleanArray) {
// filter(criterion::black_box(data_array), criterion::black_box(filter_array)).unwrap();
// }

fn bench_filter_context_u8(data_array: &UInt8Array, filter_context: &FilterContext) {
filter_context
.filter(criterion::black_box(data_array))
.unwrap();
fn bench_filter(data_array: &UInt8Array, filter_array: &BooleanArray) {
criterion::black_box(filter(data_array, filter_array).unwrap());
}

fn bench_filter_context_f32(data_array: &Float32Array, filter_context: &FilterContext) {
filter_context
.filter(criterion::black_box(data_array))
.unwrap();
fn bench_built_filter<'a>(filter: &Filter<'a>, data: &impl Array) {
criterion::black_box(filter(&data.data()));
}

fn add_benchmark(c: &mut Criterion) {
let size = 65536;
let filter_array = create_bool_array(size, |i| match i % 2 {
0 => true,
_ => false,
});
let sparse_filter_array = create_bool_array(size, |i| match i % 8000 {
0 => true,
_ => false,
});
let dense_filter_array = create_bool_array(size, |i| match i % 8000 {
0 => false,
_ => true,
});
let filter_array = create_bool_array(size, 0.5);
let sparse_filter_array = create_bool_array(size, 1.0 - 1.0 / 8000.0);
let dense_filter_array = create_bool_array(size, 1.0 / 8000.0);

let filter_context = FilterContext::new(&filter_array).unwrap();
let sparse_filter_context = FilterContext::new(&sparse_filter_array).unwrap();
let dense_filter_context = FilterContext::new(&dense_filter_array).unwrap();
let filter = build_filter(&filter_array).unwrap();
let sparse_filter = build_filter(&sparse_filter_array).unwrap();
let dense_filter = build_filter(&dense_filter_array).unwrap();

let data_array = create_primitive_array::<UInt8Type>(size, 0.0);

let data_array = create_primitive_array(size, |i| match i % 2 {
0 => 1,
_ => 0,
});
c.bench_function("filter u8 low selectivity", |b| {
b.iter(|| bench_filter_u8(&data_array, &filter_array))
b.iter(|| bench_filter(&data_array, &filter_array))
});
c.bench_function("filter u8 high selectivity", |b| {
b.iter(|| bench_filter_u8(&data_array, &sparse_filter_array))
b.iter(|| bench_filter(&data_array, &sparse_filter_array))
});
c.bench_function("filter u8 very low selectivity", |b| {
b.iter(|| bench_filter_u8(&data_array, &dense_filter_array))
b.iter(|| bench_filter(&data_array, &dense_filter_array))
});

c.bench_function("filter context u8 low selectivity", |b| {
b.iter(|| bench_filter_context_u8(&data_array, &filter_context))
b.iter(|| bench_built_filter(&filter, &data_array))
});
c.bench_function("filter context u8 high selectivity", |b| {
b.iter(|| bench_filter_context_u8(&data_array, &sparse_filter_context))
b.iter(|| bench_built_filter(&sparse_filter, &data_array))
});
c.bench_function("filter context u8 very low selectivity", |b| {
b.iter(|| bench_filter_context_u8(&data_array, &dense_filter_context))
b.iter(|| bench_built_filter(&dense_filter, &data_array))
});

let data_array = create_u8_array_with_nulls(size);
let data_array = create_primitive_array::<UInt8Type>(size, 0.5);
c.bench_function("filter context u8 w NULLs low selectivity", |b| {
b.iter(|| bench_filter_context_u8(&data_array, &filter_context))
b.iter(|| bench_built_filter(&filter, &data_array))
});
c.bench_function("filter context u8 w NULLs high selectivity", |b| {
b.iter(|| bench_filter_context_u8(&data_array, &sparse_filter_context))
b.iter(|| bench_built_filter(&sparse_filter, &data_array))
});
c.bench_function("filter context u8 w NULLs very low selectivity", |b| {
b.iter(|| bench_filter_context_u8(&data_array, &dense_filter_context))
b.iter(|| bench_built_filter(&dense_filter, &data_array))
});

let data_array = create_primitive_array(size, |i| match i % 2 {
0 => 1.0,
_ => 0.0,
});
let data_array = create_primitive_array::<Float32Type>(size, 0.5);
c.bench_function("filter context f32 low selectivity", |b| {
b.iter(|| bench_filter_context_f32(&data_array, &filter_context))
b.iter(|| bench_built_filter(&filter, &data_array))
});
c.bench_function("filter context f32 high selectivity", |b| {
b.iter(|| bench_filter_context_f32(&data_array, &sparse_filter_context))
b.iter(|| bench_built_filter(&sparse_filter, &data_array))
});
c.bench_function("filter context f32 very low selectivity", |b| {
b.iter(|| bench_filter_context_f32(&data_array, &dense_filter_context))
b.iter(|| bench_built_filter(&dense_filter, &data_array))
});

let data_array = create_string_array(size, 0.5);
c.bench_function("filter context string low selectivity", |b| {
b.iter(|| bench_built_filter(&filter, &data_array))
});
c.bench_function("filter context string high selectivity", |b| {
b.iter(|| bench_built_filter(&sparse_filter, &data_array))
});
c.bench_function("filter context string very low selectivity", |b| {
b.iter(|| bench_built_filter(&dense_filter, &data_array))
});
}

Expand Down
3 changes: 3 additions & 0 deletions rust/arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ mod iterator;
mod null;
mod ord;
mod raw_pointer;
mod transform;

use crate::datatypes::*;

Expand Down Expand Up @@ -249,6 +250,8 @@ pub type DurationMillisecondBuilder = PrimitiveBuilder<DurationMillisecondType>;
pub type DurationMicrosecondBuilder = PrimitiveBuilder<DurationMicrosecondType>;
pub type DurationNanosecondBuilder = PrimitiveBuilder<DurationNanosecondType>;

pub use self::transform::MutableArrayData;

// --------------------- Array Iterator ---------------------

pub use self::iterator::*;
Expand Down
50 changes: 50 additions & 0 deletions rust/arrow/src/array/transform/boolean.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::array::ArrayData;

use super::{
Extend, _MutableArrayData,
utils::{reserve_for_bits, set_bits},
};

pub(super) fn build_extend(array: &ArrayData) -> Extend {
let values = array.buffers()[0].data();
Box::new(
move |mutable: &mut _MutableArrayData, start: usize, len: usize| {
let buffer = &mut mutable.buffers[0];
reserve_for_bits(buffer, mutable.len + len);
set_bits(
&mut buffer.data_mut(),
values,
mutable.len,
array.offset() + start,
len,
);
},
)
}

pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
let buffer = &mut mutable.buffers[0];
reserve_for_bits(buffer, mutable.len + len);
}

pub(super) fn push_null(mutable: &mut _MutableArrayData) {
let buffer = &mut mutable.buffers[0];
reserve_for_bits(buffer, mutable.len + 1);
}
97 changes: 97 additions & 0 deletions rust/arrow/src/array/transform/list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::{
array::{ArrayData, OffsetSizeTrait},
datatypes::ToByteSlice,
};

use super::{Extend, _MutableArrayData, utils::extend_offsets};

pub(super) fn build_extend<T: OffsetSizeTrait>(array: &ArrayData) -> Extend {
let offsets = array.buffer::<T>(0);
if array.null_count() == 0 {
// fast case where we can copy regions without nullability checks
Box::new(
move |mutable: &mut _MutableArrayData, start: usize, len: usize| {
let mutable_offsets = mutable.buffer::<T>(0);
let last_offset = mutable_offsets[mutable_offsets.len() - 1];
// offsets
extend_offsets::<T>(
&mut mutable.buffers[0],
last_offset,
&offsets[start..start + len + 1],
);

mutable.child_data[0].extend(
offsets[start].to_usize().unwrap(),
offsets[start + len].to_usize().unwrap(),
)
},
)
} else {
// nulls present: append item by item, ignoring null entries
Box::new(
move |mutable: &mut _MutableArrayData, start: usize, len: usize| {
let mutable_offsets = mutable.buffer::<T>(0);
let mut last_offset = mutable_offsets[mutable_offsets.len() - 1];

let buffer = &mut mutable.buffers[0];
let delta_len = array.len() - array.null_count();
buffer.reserve(buffer.len() + delta_len * std::mem::size_of::<T>());

let child = &mut mutable.child_data[0];
(start..start + len).for_each(|i| {
if array.is_valid(i) {
// compute the new offset
last_offset = last_offset + offsets[i + 1] - offsets[i];

// append value
child.extend(
offsets[i].to_usize().unwrap(),
offsets[i + 1].to_usize().unwrap(),
);
}
// append offset
buffer.extend_from_slice(last_offset.to_byte_slice());
})
},
)
}
}

pub(super) fn extend_nulls<T: OffsetSizeTrait>(
mutable: &mut _MutableArrayData,
len: usize,
) {
let mutable_offsets = mutable.buffer::<T>(0);
let last_offset = mutable_offsets[mutable_offsets.len() - 1];

let offset_buffer = &mut mutable.buffers[0];

let offsets = vec![last_offset; len];
offset_buffer.extend_from_slice(offsets.to_byte_slice());
}

pub(super) fn push_null<T: OffsetSizeTrait>(mutable: &mut _MutableArrayData) {
let mutable_offsets = mutable.buffer::<T>(0);
let last_offset = mutable_offsets[mutable_offsets.len() - 1];

let offset_buffer = &mut mutable.buffers[0];

offset_buffer.extend_from_slice(last_offset.to_byte_slice());
}
Loading