diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index 6b0241a10a544..e6392207be894 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -84,3 +84,7 @@ name = "array_slice" [[bench]] harness = false name = "map" + +[[bench]] +harness = false +name = "array_remove" diff --git a/datafusion/functions-nested/benches/array_remove.rs b/datafusion/functions-nested/benches/array_remove.rs new file mode 100644 index 0000000000000..2be154a2f89a8 --- /dev/null +++ b/datafusion/functions-nested/benches/array_remove.rs @@ -0,0 +1,573 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; + +use arrow::array::{ + Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, FixedSizeBinaryArray, + Float64Array, Int64Array, ListArray, StringArray, +}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion}; +use datafusion_common::ScalarValue; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::remove::ArrayRemove; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 10000; +const ARRAY_SIZES: &[usize] = &[10, 100, 500]; +const SEED: u64 = 42; +const NULL_DENSITY: f64 = 0.1; + +fn criterion_benchmark(c: &mut Criterion) { + // Test array_remove with different data types and array sizes + // TODO: Add performance tests for nested datatypes + bench_array_remove_int64(c); + bench_array_remove_f64(c); + bench_array_remove_strings(c); + bench_array_remove_binary(c); + bench_array_remove_boolean(c); + bench_array_remove_decimal64(c); + bench_array_remove_fixed_size_binary(c); +} + +fn bench_array_remove_int64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_int64"); + + for &array_size in ARRAY_SIZES { + let list_array = create_int64_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Int64(Some(1)); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Int64, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_f64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_f64"); + + for &array_size in ARRAY_SIZES { + let list_array = create_f64_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Float64(Some(1.0)); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Float64, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_strings(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_strings"); + + for &array_size in ARRAY_SIZES { + let list_array = create_string_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Utf8(Some("value_1".to_string())); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Utf8, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_binary(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_binary"); + + for &array_size in ARRAY_SIZES { + let list_array = create_binary_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Binary(Some(b"value_1".to_vec())); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Binary, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_boolean(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_boolean"); + + for &array_size in ARRAY_SIZES { + let list_array = create_boolean_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Boolean(Some(true)); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Boolean, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_decimal64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_decimal64"); + + for &array_size in ARRAY_SIZES { + let list_array = create_decimal64_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Decimal128(Some(100_i128), 10, 2); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Decimal128(10, 2), false) + .into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_fixed_size_binary(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_fixed_size_binary"); + + for &array_size in ARRAY_SIZES { + let list_array = + create_fixed_size_binary_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::FixedSizeBinary(16, Some(vec![1u8; 16])); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::FixedSizeBinary(16), false) + .into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn create_args(list_array: ArrayRef, element: ScalarValue) -> Vec { + vec![ + ColumnarValue::Array(list_array), + ColumnarValue::Scalar(element), + ] +} + +fn create_int64_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0..array_size as i64)) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Int64, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_f64_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0.0..array_size as f64)) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Float64, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_string_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + let idx = rng.random_range(0..array_size); + Some(format!("value_{idx}")) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Utf8, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_binary_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + let idx = rng.random_range(0..array_size); + Some(format!("value_{idx}").into_bytes()) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Binary, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_boolean_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random::()) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Boolean, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_decimal64_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0..array_size) as i128 * 100) + } + }) + .collect::() + .with_precision_and_scale(10, 2) + .unwrap(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Decimal128(10, 2), true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_fixed_size_binary_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let mut buffer = Vec::with_capacity(num_rows * array_size * 16); + let mut null_buffer = Vec::with_capacity(num_rows * array_size); + for _ in 0..num_rows * array_size { + if rng.random::() < null_density { + null_buffer.push(false); + buffer.extend_from_slice(&[0u8; 16]); + } else { + null_buffer.push(true); + let mut bytes = [0u8; 16]; + rng.fill(&mut bytes); + buffer.extend_from_slice(&bytes); + } + } + let nulls = arrow::buffer::NullBuffer::from_iter(null_buffer.iter().copied()); + let values = FixedSizeBinaryArray::new(16, buffer.into(), Some(nulls)); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::FixedSizeBinary(16), true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-nested/src/remove.rs b/datafusion/functions-nested/src/remove.rs index 41c06cb9c4cbf..9e957c93e1c66 100644 --- a/datafusion/functions-nested/src/remove.rs +++ b/datafusion/functions-nested/src/remove.rs @@ -20,8 +20,8 @@ use crate::utils; use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait, cast::AsArray, - new_empty_array, + Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullBufferBuilder, + OffsetSizeTrait, cast::AsArray, make_array, }; use arrow::buffer::OffsetBuffer; use arrow::datatypes::{DataType, FieldRef}; @@ -377,73 +377,84 @@ fn general_remove( ); } }; - let data_type = list_field.data_type(); - let mut new_values = vec![]; + let original_data = list_array.values().to_data(); // Build up the offsets for the final output array let mut offsets = Vec::::with_capacity(arr_n.len() + 1); offsets.push(OffsetSize::zero()); - // n is the number of elements to remove in this row - for (row_index, (list_array_row, n)) in - list_array.iter().zip(arr_n.iter()).enumerate() - { - match list_array_row { - Some(list_array_row) => { - let eq_array = utils::compare_element_to_list( - &list_array_row, - element_array, - row_index, - false, - )?; - - // We need to keep at most first n elements as `false`, which represent the elements to remove. - let eq_array = if eq_array.false_count() < *n as usize { - eq_array - } else { - let mut count = 0; - eq_array - .iter() - .map(|e| { - // Keep first n `false` elements, and reverse other elements to `true`. - if let Some(false) = e { - if count < *n { - count += 1; - e - } else { - Some(true) - } - } else { - e - } - }) - .collect::() - }; - - let filtered_array = arrow::compute::filter(&list_array_row, &eq_array)?; - offsets.push( - offsets[row_index] + OffsetSize::usize_as(filtered_array.len()), - ); - new_values.push(filtered_array); - } - None => { - // Null element results in a null row (no new offsets) - offsets.push(offsets[row_index]); + let mut mutable = MutableArrayData::with_capacities( + vec![&original_data], + false, + Capacities::Array(original_data.len()), + ); + let mut valid = NullBufferBuilder::new(list_array.len()); + + for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { + if list_array.is_null(row_index) { + offsets.push(offsets[row_index]); + valid.append_null(); + continue; + } + + let start = offset_window[0].to_usize().unwrap(); + let end = offset_window[1].to_usize().unwrap(); + // n is the number of elements to remove in this row + let n = arr_n[row_index]; + + // compare each element in the list, `false` means the element matches and should be removed + let eq_array = utils::compare_element_to_list( + &list_array.value(row_index), + element_array, + row_index, + false, + )?; + + let num_to_remove = eq_array.false_count(); + + // Fast path: no elements to remove, copy entire row + if num_to_remove == 0 { + mutable.extend(0, start, end); + offsets.push(offsets[row_index] + OffsetSize::usize_as(end - start)); + valid.append_non_null(); + continue; + } + + // Remove at most `n` matching elements + let max_removals = n.min(num_to_remove as i64); + let mut removed = 0i64; + let mut copied = 0usize; + // marks the beginning of a range of elements pending to be copied. + let mut pending_batch_to_retain: Option = None; + for (i, keep) in eq_array.iter().enumerate() { + if keep == Some(false) && removed < max_removals { + // Flush pending batch before skipping this element + if let Some(bs) = pending_batch_to_retain { + mutable.extend(0, start + bs, start + i); + copied += i - bs; + pending_batch_to_retain = None; + } + removed += 1; + } else if pending_batch_to_retain.is_none() { + pending_batch_to_retain = Some(i); } } - } - let values = if new_values.is_empty() { - new_empty_array(data_type) - } else { - let new_values = new_values.iter().map(|x| x.as_ref()).collect::>(); - arrow::compute::concat(&new_values)? - }; + // Flush remaining batch + if let Some(bs) = pending_batch_to_retain { + mutable.extend(0, start + bs, start + eq_array.len()); + copied += eq_array.len() - bs; + } + + offsets.push(offsets[row_index] + OffsetSize::usize_as(copied)); + valid.append_non_null(); + } + let new_values = make_array(mutable.freeze()); Ok(Arc::new(GenericListArray::::try_new( Arc::clone(list_field), OffsetBuffer::new(offsets.into()), - values, - list_array.nulls().cloned(), + new_values, + valid.finish(), )?)) }