Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 244 additions & 8 deletions datafusion/common/benches/with_hashes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

use ahash::RandomState;
use arrow::array::{
Array, ArrayRef, ArrowPrimitiveType, DictionaryArray, GenericStringArray,
NullBufferBuilder, OffsetSizeTrait, PrimitiveArray, RunArray, StringViewArray,
StructArray, make_array,
Array, ArrayRef, ArrowPrimitiveType, DictionaryArray, GenericStringArray, Int32Array,
Int64Array, ListArray, MapArray, NullBufferBuilder, OffsetSizeTrait, PrimitiveArray,
RunArray, StringViewArray, StructArray, UnionArray, make_array,
};
use arrow::buffer::NullBuffer;
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{
ArrowDictionaryKeyType, DataType, Field, Fields, Int32Type, Int64Type,
ArrowDictionaryKeyType, DataType, Field, Fields, Int32Type, Int64Type, UnionFields,
};
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use datafusion_common::hash_utils::with_hashes;
Expand All @@ -40,6 +40,7 @@ const BATCH_SIZE: usize = 8192;
struct BenchData {
name: &'static str,
array: ArrayRef,
/// Union arrays can't have null bitmasks added
supports_nulls: bool,
}

Expand Down Expand Up @@ -78,6 +79,26 @@ fn criterion_benchmark(c: &mut Criterion) {
array: pool.dictionary_array::<Int32Type>(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "list_array",
array: list_array(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "map_array",
array: map_array(BATCH_SIZE),
supports_nulls: true,
},
BenchData {
name: "sparse_union",
array: sparse_union_array(BATCH_SIZE),
supports_nulls: false,
},
BenchData {
name: "dense_union",
array: dense_union_array(BATCH_SIZE),
supports_nulls: false,
},
BenchData {
name: "struct_array",
array: create_struct_array(&pool, BATCH_SIZE),
Expand All @@ -103,10 +124,9 @@ fn criterion_benchmark(c: &mut Criterion) {
let arrays = vec![array.clone(), array.clone(), array.clone()];
do_hash_test(b, &arrays);
});

// Union arrays can't have null bitmasks
if supports_nulls {
let nullable_array = add_nulls(&array);

c.bench_function(&format!("{name}: single, nulls"), |b| {
do_hash_test(b, std::slice::from_ref(&nullable_array));
});
Expand Down Expand Up @@ -268,6 +288,222 @@ where
Arc::new(array)
}

/// Benchmark sliced arrays to demonstrate the optimization for when an array is
/// sliced, the underlying buffer may be much larger than what's referenced by
/// the slice. The optimization avoids hashing unreferenced elements.
fn sliced_array_benchmark(c: &mut Criterion) {
// Test with different slice ratios: slice_size / total_size
// Smaller ratio = more potential savings from the optimization
let slice_ratios = [10, 5, 2]; // 1/10, 1/5, 1/2 of total

for ratio in slice_ratios {
let total_rows = BATCH_SIZE * ratio;
let slice_offset = BATCH_SIZE * (ratio / 2); // Take from middle
let slice_len = BATCH_SIZE;

// Sliced ListArray
{
let full_array = list_array(total_rows);
let sliced: ArrayRef = Arc::new(
full_array
.as_any()
.downcast_ref::<ListArray>()
.unwrap()
.slice(slice_offset, slice_len),
);
c.bench_function(
&format!("list_array_sliced: 1/{ratio} of {total_rows} rows"),
|b| {
do_hash_test_with_len(b, std::slice::from_ref(&sliced), slice_len);
},
);
}

// Sliced MapArray
{
let full_array = map_array(total_rows);
let sliced: ArrayRef = Arc::new(
full_array
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.slice(slice_offset, slice_len),
);
c.bench_function(
&format!("map_array_sliced: 1/{ratio} of {total_rows} rows"),
|b| {
do_hash_test_with_len(b, std::slice::from_ref(&sliced), slice_len);
},
);
}

// Sliced Sparse UnionArray
{
let full_array = sparse_union_array(total_rows);
let sliced: ArrayRef = Arc::new(
full_array
.as_any()
.downcast_ref::<UnionArray>()
.unwrap()
.slice(slice_offset, slice_len),
);
c.bench_function(
&format!("sparse_union_sliced: 1/{ratio} of {total_rows} rows"),
|b| {
do_hash_test_with_len(b, std::slice::from_ref(&sliced), slice_len);
},
);
}
}
}

fn do_hash_test_with_len(b: &mut Bencher, arrays: &[ArrayRef], expected_len: usize) {
let state = RandomState::new();
b.iter(|| {
with_hashes(arrays, &state, |hashes| {
assert_eq!(hashes.len(), expected_len);
Ok(())
})
.unwrap();
});
}

fn list_array(num_rows: usize) -> ArrayRef {
let mut rng = make_rng();
let elements_per_row = 5;
let total_elements = num_rows * elements_per_row;

let values: Int64Array = (0..total_elements)
.map(|_| Some(rng.random::<i64>()))
.collect();
let offsets: Vec<i32> = (0..=num_rows)
.map(|i| (i * elements_per_row) as i32)
.collect();

Arc::new(ListArray::new(
Arc::new(Field::new("item", DataType::Int64, true)),
OffsetBuffer::new(ScalarBuffer::from(offsets)),
Arc::new(values),
None,
))
}

fn map_array(num_rows: usize) -> ArrayRef {
let mut rng = make_rng();
let entries_per_row = 5;
let total_entries = num_rows * entries_per_row;

let keys: Int32Array = (0..total_entries)
.map(|_| Some(rng.random::<i32>()))
.collect();
let values: Int64Array = (0..total_entries)
.map(|_| Some(rng.random::<i64>()))
.collect();
let offsets: Vec<i32> = (0..=num_rows)
.map(|i| (i * entries_per_row) as i32)
.collect();

let entries = StructArray::try_new(
Fields::from(vec![
Field::new("keys", DataType::Int32, false),
Field::new("values", DataType::Int64, true),
]),
vec![Arc::new(keys), Arc::new(values)],
None,
)
.unwrap();

Arc::new(MapArray::new(
Arc::new(Field::new(
"entries",
DataType::Struct(Fields::from(vec![
Field::new("keys", DataType::Int32, false),
Field::new("values", DataType::Int64, true),
])),
false,
)),
OffsetBuffer::new(ScalarBuffer::from(offsets)),
entries,
None,
false,
))
}

fn sparse_union_array(num_rows: usize) -> ArrayRef {
let mut rng = make_rng();
let num_types = 5;

let type_ids: Vec<i8> = (0..num_rows)
.map(|_| rng.random_range(0..num_types) as i8)
.collect();
let (fields, children): (Vec<_>, Vec<_>) = (0..num_types)
.map(|i| {
(
(
i as i8,
Arc::new(Field::new(format!("f{i}"), DataType::Int64, true)),
),
primitive_array::<Int64Type>(num_rows),
)
})
.unzip();

Arc::new(
UnionArray::try_new(
UnionFields::from_iter(fields),
ScalarBuffer::from(type_ids),
None,
children,
)
.unwrap(),
)
}

fn dense_union_array(num_rows: usize) -> ArrayRef {
let mut rng = make_rng();
let num_types = 5;
let type_ids: Vec<i8> = (0..num_rows)
.map(|_| rng.random_range(0..num_types) as i8)
.collect();

let mut type_counts = vec![0i32; num_types];
for &tid in &type_ids {
type_counts[tid as usize] += 1;
}

let mut current_offsets = vec![0i32; num_types];
let offsets: Vec<i32> = type_ids
.iter()
.map(|&tid| {
let offset = current_offsets[tid as usize];
current_offsets[tid as usize] += 1;
offset
})
.collect();

let (fields, children): (Vec<_>, Vec<_>) = (0..num_types)
.map(|i| {
(
(
i as i8,
Arc::new(Field::new(format!("f{i}"), DataType::Int64, true)),
),
primitive_array::<Int64Type>(type_counts[i] as usize),
)
})
.unzip();

Arc::new(
UnionArray::try_new(
UnionFields::from_iter(fields),
ScalarBuffer::from(type_ids),
Some(ScalarBuffer::from(offsets)),
children,
)
.unwrap(),
)
}

fn boolean_array(array_len: usize) -> ArrayRef {
let mut rng = make_rng();
Arc::new(
Expand Down Expand Up @@ -329,5 +565,5 @@ where
)
}

criterion_group!(benches, criterion_benchmark);
criterion_group!(benches, criterion_benchmark, sliced_array_benchmark);
criterion_main!(benches);
Loading