From 1ff882ae4b6ba90359b1a1dba16453572719c850 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Sun, 25 Jan 2026 22:24:13 +0800 Subject: [PATCH 1/5] perf array_remove --- datafusion/functions-nested/Cargo.toml | 4 + .../functions-nested/benches/array_remove.rs | 543 ++++++++++++++++++ datafusion/functions-nested/src/remove.rs | 126 ++-- 3 files changed, 613 insertions(+), 60 deletions(-) create mode 100644 datafusion/functions-nested/benches/array_remove.rs diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index 6b0241a10a544..e6392207be894 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -84,3 +84,7 @@ name = "array_slice" [[bench]] harness = false name = "map" + +[[bench]] +harness = false +name = "array_remove" diff --git a/datafusion/functions-nested/benches/array_remove.rs b/datafusion/functions-nested/benches/array_remove.rs new file mode 100644 index 0000000000000..d09660aab46cd --- /dev/null +++ b/datafusion/functions-nested/benches/array_remove.rs @@ -0,0 +1,543 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; + +use arrow::array::{ + Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, + FixedSizeBinaryArray, Float64Array, Int64Array, ListArray, StringArray, +}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion}; +use datafusion_common::ScalarValue; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::remove::ArrayRemove; +use rand::Rng; +use rand::rngs::StdRng; +use rand::SeedableRng; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 10000; +const ARRAY_SIZES: &[usize] = &[10, 100, 500]; +const SEED: u64 = 42; +const NULL_DENSITY: f64 = 0.1; + +fn criterion_benchmark(c: &mut Criterion) { + // Test array_remove with different data types and array sizes + bench_array_remove_int64(c); + bench_array_remove_f64(c); + bench_array_remove_strings(c); + bench_array_remove_binary(c); + bench_array_remove_boolean(c); + bench_array_remove_decimal64(c); + bench_array_remove_fixed_size_binary(c); +} + +fn bench_array_remove_int64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_int64"); + + for &array_size in ARRAY_SIZES { + let list_array = create_int64_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Int64(Some(1)); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Int64, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_f64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_f64"); + + for &array_size in ARRAY_SIZES { + let list_array = create_f64_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Float64(Some(1.0)); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Float64, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_strings(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_strings"); + + for &array_size in ARRAY_SIZES { + let list_array = create_string_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Utf8(Some("value_1".to_string())); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Utf8, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_binary(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_binary"); + + for &array_size in ARRAY_SIZES { + let list_array = create_binary_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Binary(Some(b"value_1".to_vec())); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Binary, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_boolean(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_boolean"); + + for &array_size in ARRAY_SIZES { + let list_array = create_boolean_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Boolean(Some(true)); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Boolean, false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_decimal64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_decimal64"); + + for &array_size in ARRAY_SIZES { + let list_array = create_decimal64_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::Decimal128(Some(100_i128), 10, 2); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::Decimal128(10, 2), false) + .into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_array_remove_fixed_size_binary(c: &mut Criterion) { + let mut group = c.benchmark_group("array_remove_fixed_size_binary"); + + for &array_size in ARRAY_SIZES { + let list_array = create_fixed_size_binary_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let element_to_remove = ScalarValue::FixedSizeBinary(16, Some(vec![1u8; 16])); + let args = create_args(list_array.clone(), element_to_remove.clone()); + + group.bench_with_input( + BenchmarkId::new("remove", array_size), + &array_size, + |b, _| { + let udf = ArrayRemove::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("arr", list_array.data_type().clone(), false) + .into(), + Field::new("el", DataType::FixedSizeBinary(16), false) + .into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + list_array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn create_args(list_array: ArrayRef, element: ScalarValue) -> Vec { + vec![ + ColumnarValue::Array(list_array), + ColumnarValue::Scalar(element), + ] +} + +fn create_int64_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0..array_size as i64)) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Int64, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_f64_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0.0..array_size as f64)) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Float64, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_string_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + let idx = rng.random_range(0..array_size); + Some(format!("value_{}", idx)) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Utf8, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_binary_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + let idx = rng.random_range(0..array_size); + Some(format!("value_{}", idx).into_bytes()) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Binary, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_boolean_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random::()) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Boolean, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_decimal64_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0..array_size) as i128 * 100) + } + }) + .collect::() + .with_precision_and_scale(10, 2) + .unwrap(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Decimal128(10, 2), true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_fixed_size_binary_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let mut buffer = Vec::with_capacity(num_rows * array_size * 16); + let mut null_buffer = Vec::with_capacity(num_rows * array_size); + for _ in 0..num_rows * array_size { + if rng.random::() < null_density { + null_buffer.push(false); + buffer.extend_from_slice(&[0u8; 16]); + } else { + null_buffer.push(true); + let mut bytes = [0u8; 16]; + rng.fill(&mut bytes); + buffer.extend_from_slice(&bytes); + } + } + let nulls = arrow::buffer::NullBuffer::from_iter(null_buffer.iter().copied()); + let values = FixedSizeBinaryArray::new(16, buffer.into(), Some(nulls)); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::FixedSizeBinary(16), true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-nested/src/remove.rs b/datafusion/functions-nested/src/remove.rs index 41c06cb9c4cbf..b214c95b4e425 100644 --- a/datafusion/functions-nested/src/remove.rs +++ b/datafusion/functions-nested/src/remove.rs @@ -20,14 +20,14 @@ use crate::utils; use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait, cast::AsArray, - new_empty_array, + cast::AsArray, make_array, Array, ArrayRef, Capacities, + GenericListArray, MutableArrayData, NullBufferBuilder, OffsetSizeTrait, }; use arrow::buffer::OffsetBuffer; use arrow::datatypes::{DataType, FieldRef}; use datafusion_common::cast::as_int64_array; use datafusion_common::utils::ListCoercion; -use datafusion_common::{Result, exec_err, internal_err, utils::take_function_args}; +use datafusion_common::{exec_err, internal_err, utils::take_function_args, Result}; use datafusion_expr::{ ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -377,73 +377,79 @@ fn general_remove( ); } }; - let data_type = list_field.data_type(); - let mut new_values = vec![]; - // Build up the offsets for the final output array + let original_data = list_array.values().to_data(); let mut offsets = Vec::::with_capacity(arr_n.len() + 1); offsets.push(OffsetSize::zero()); - // n is the number of elements to remove in this row - for (row_index, (list_array_row, n)) in - list_array.iter().zip(arr_n.iter()).enumerate() - { - match list_array_row { - Some(list_array_row) => { - let eq_array = utils::compare_element_to_list( - &list_array_row, - element_array, - row_index, - false, - )?; - - // We need to keep at most first n elements as `false`, which represent the elements to remove. - let eq_array = if eq_array.false_count() < *n as usize { - eq_array - } else { - let mut count = 0; - eq_array - .iter() - .map(|e| { - // Keep first n `false` elements, and reverse other elements to `true`. - if let Some(false) = e { - if count < *n { - count += 1; - e - } else { - Some(true) - } - } else { - e - } - }) - .collect::() - }; - - let filtered_array = arrow::compute::filter(&list_array_row, &eq_array)?; - offsets.push( - offsets[row_index] + OffsetSize::usize_as(filtered_array.len()), - ); - new_values.push(filtered_array); - } - None => { - // Null element results in a null row (no new offsets) - offsets.push(offsets[row_index]); + let mut mutable = MutableArrayData::with_capacities( + vec![&original_data], + false, + Capacities::Array(original_data.len()), + ); + let mut valid = NullBufferBuilder::new(list_array.len()); + + for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { + if list_array.is_null(row_index) { + offsets.push(offsets[row_index]); + valid.append_null(); + continue; + } + + let start = offset_window[0].to_usize().unwrap(); + let end = offset_window[1].to_usize().unwrap(); + let n = arr_n[row_index]; + + let eq_array = utils::compare_element_to_list( + &list_array.value(row_index), + element_array, + row_index, + false, + )?; + + let false_count = eq_array.false_count(); + + // Fast path: no elements to remove, copy entire row + if false_count == 0 { + mutable.extend(0, start, end); + offsets.push(offsets[row_index] + OffsetSize::usize_as(end - start)); + valid.append_non_null(); + continue; + } + + let max_removals = n.min(false_count as i64); + let mut removed = 0i64; + let mut copied = 0usize; + let mut batch_start: Option = None; + for (i, keep) in eq_array.iter().enumerate() { + if keep == Some(false) && removed < max_removals { + // Flush pending batch before skipping this element + if let Some(bs) = batch_start { + mutable.extend(0, start + bs, start + i); + copied += i - bs; + batch_start = None; + } + removed += 1; + } else if batch_start.is_none() { + batch_start = Some(i); } } - } - let values = if new_values.is_empty() { - new_empty_array(data_type) - } else { - let new_values = new_values.iter().map(|x| x.as_ref()).collect::>(); - arrow::compute::concat(&new_values)? - }; + // Flush remaining batch + if let Some(bs) = batch_start { + mutable.extend(0, start + bs, start + eq_array.len()); + copied += eq_array.len() - bs; + } + + offsets.push(offsets[row_index] + OffsetSize::usize_as(copied)); + valid.append_non_null(); + } + let new_values = make_array(mutable.freeze()); Ok(Arc::new(GenericListArray::::try_new( Arc::clone(list_field), OffsetBuffer::new(offsets.into()), - values, - list_array.nulls().cloned(), + new_values, + valid.finish(), )?)) } From d6c3d1a0f6a9b379c184a1909aef908865cc0009 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Sun, 25 Jan 2026 22:26:45 +0800 Subject: [PATCH 2/5] cargo fmt --- .../functions-nested/benches/array_remove.rs | 55 ++++++++++++++----- datafusion/functions-nested/src/remove.rs | 6 +- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/datafusion/functions-nested/benches/array_remove.rs b/datafusion/functions-nested/benches/array_remove.rs index d09660aab46cd..0a8f297d18e82 100644 --- a/datafusion/functions-nested/benches/array_remove.rs +++ b/datafusion/functions-nested/benches/array_remove.rs @@ -19,8 +19,8 @@ extern crate criterion; use arrow::array::{ - Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, - FixedSizeBinaryArray, Float64Array, Int64Array, ListArray, StringArray, + Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, FixedSizeBinaryArray, + Float64Array, Int64Array, ListArray, StringArray, }; use arrow::buffer::OffsetBuffer; use arrow::datatypes::{DataType, Field}; @@ -30,8 +30,8 @@ use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; use datafusion_functions_nested::remove::ArrayRemove; use rand::Rng; -use rand::rngs::StdRng; use rand::SeedableRng; +use rand::rngs::StdRng; use std::hint::black_box; use std::sync::Arc; @@ -302,7 +302,8 @@ fn bench_array_remove_fixed_size_binary(c: &mut Criterion) { let mut group = c.benchmark_group("array_remove_fixed_size_binary"); for &array_size in ARRAY_SIZES { - let list_array = create_fixed_size_binary_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let list_array = + create_fixed_size_binary_list_array(NUM_ROWS, array_size, NULL_DENSITY); let element_to_remove = ScalarValue::FixedSizeBinary(16, Some(vec![1u8; 16])); let args = create_args(list_array.clone(), element_to_remove.clone()); @@ -347,7 +348,11 @@ fn create_args(list_array: ArrayRef, element: ScalarValue) -> Vec ] } -fn create_int64_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { +fn create_int64_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { let mut rng = StdRng::seed_from_u64(SEED); let values = (0..num_rows * array_size) .map(|_| { @@ -373,7 +378,11 @@ fn create_int64_list_array(num_rows: usize, array_size: usize, null_density: f64 ) } -fn create_f64_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { +fn create_f64_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { let mut rng = StdRng::seed_from_u64(SEED); let values = (0..num_rows * array_size) .map(|_| { @@ -399,7 +408,11 @@ fn create_f64_list_array(num_rows: usize, array_size: usize, null_density: f64) ) } -fn create_string_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { +fn create_string_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { let mut rng = StdRng::seed_from_u64(SEED); let values = (0..num_rows * array_size) .map(|_| { @@ -407,7 +420,7 @@ fn create_string_list_array(num_rows: usize, array_size: usize, null_density: f6 None } else { let idx = rng.random_range(0..array_size); - Some(format!("value_{}", idx)) + Some(format!("value_{idx}")) } }) .collect::(); @@ -426,7 +439,11 @@ fn create_string_list_array(num_rows: usize, array_size: usize, null_density: f6 ) } -fn create_binary_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { +fn create_binary_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { let mut rng = StdRng::seed_from_u64(SEED); let values = (0..num_rows * array_size) .map(|_| { @@ -434,7 +451,7 @@ fn create_binary_list_array(num_rows: usize, array_size: usize, null_density: f6 None } else { let idx = rng.random_range(0..array_size); - Some(format!("value_{}", idx).into_bytes()) + Some(format!("value_{idx}").into_bytes()) } }) .collect::(); @@ -453,7 +470,11 @@ fn create_binary_list_array(num_rows: usize, array_size: usize, null_density: f6 ) } -fn create_boolean_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { +fn create_boolean_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { let mut rng = StdRng::seed_from_u64(SEED); let values = (0..num_rows * array_size) .map(|_| { @@ -479,7 +500,11 @@ fn create_boolean_list_array(num_rows: usize, array_size: usize, null_density: f ) } -fn create_decimal64_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { +fn create_decimal64_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { let mut rng = StdRng::seed_from_u64(SEED); let values = (0..num_rows * array_size) .map(|_| { @@ -507,7 +532,11 @@ fn create_decimal64_list_array(num_rows: usize, array_size: usize, null_density: ) } -fn create_fixed_size_binary_list_array(num_rows: usize, array_size: usize, null_density: f64) -> ArrayRef { +fn create_fixed_size_binary_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { let mut rng = StdRng::seed_from_u64(SEED); let mut buffer = Vec::with_capacity(num_rows * array_size * 16); let mut null_buffer = Vec::with_capacity(num_rows * array_size); diff --git a/datafusion/functions-nested/src/remove.rs b/datafusion/functions-nested/src/remove.rs index b214c95b4e425..d0497efe54151 100644 --- a/datafusion/functions-nested/src/remove.rs +++ b/datafusion/functions-nested/src/remove.rs @@ -20,14 +20,14 @@ use crate::utils; use crate::utils::make_scalar_function; use arrow::array::{ - cast::AsArray, make_array, Array, ArrayRef, Capacities, - GenericListArray, MutableArrayData, NullBufferBuilder, OffsetSizeTrait, + Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullBufferBuilder, + OffsetSizeTrait, cast::AsArray, make_array, }; use arrow::buffer::OffsetBuffer; use arrow::datatypes::{DataType, FieldRef}; use datafusion_common::cast::as_int64_array; use datafusion_common::utils::ListCoercion; -use datafusion_common::{exec_err, internal_err, utils::take_function_args, Result}; +use datafusion_common::{Result, exec_err, internal_err, utils::take_function_args}; use datafusion_expr::{ ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility, From 4d2fdc7bb2747cdba823a489acd8447ffb940572 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Sun, 25 Jan 2026 22:35:08 +0800 Subject: [PATCH 3/5] nit --- datafusion/functions-nested/src/remove.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/functions-nested/src/remove.rs b/datafusion/functions-nested/src/remove.rs index d0497efe54151..71d35ef13baf8 100644 --- a/datafusion/functions-nested/src/remove.rs +++ b/datafusion/functions-nested/src/remove.rs @@ -378,6 +378,7 @@ fn general_remove( } }; let original_data = list_array.values().to_data(); + // Build up the offsets for the final output array let mut offsets = Vec::::with_capacity(arr_n.len() + 1); offsets.push(OffsetSize::zero()); @@ -397,8 +398,10 @@ fn general_remove( let start = offset_window[0].to_usize().unwrap(); let end = offset_window[1].to_usize().unwrap(); + // n is the number of elements to remove in this row let n = arr_n[row_index]; + // compare each element in the list, `false` means the element matches and should be removed let eq_array = utils::compare_element_to_list( &list_array.value(row_index), element_array, @@ -416,6 +419,7 @@ fn general_remove( continue; } + // Remove at most `n` matching elements let max_removals = n.min(false_count as i64); let mut removed = 0i64; let mut copied = 0usize; From 97551cfc370160d3488718743cfe6bb85580f2a9 Mon Sep 17 00:00:00 2001 From: lyne <33561138+lyne7-sc@users.noreply.github.com> Date: Mon, 26 Jan 2026 21:36:41 +0800 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Oleks V --- datafusion/functions-nested/src/remove.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-nested/src/remove.rs b/datafusion/functions-nested/src/remove.rs index 71d35ef13baf8..1b9f67659a265 100644 --- a/datafusion/functions-nested/src/remove.rs +++ b/datafusion/functions-nested/src/remove.rs @@ -409,7 +409,7 @@ fn general_remove( false, )?; - let false_count = eq_array.false_count(); + let num_to_remove = eq_array.false_count(); // Fast path: no elements to remove, copy entire row if false_count == 0 { @@ -423,7 +423,8 @@ fn general_remove( let max_removals = n.min(false_count as i64); let mut removed = 0i64; let mut copied = 0usize; - let mut batch_start: Option = None; + // marks the beginning of a range of elements pending to be copied. + let mut pending_batch_to_retain: Option = None; for (i, keep) in eq_array.iter().enumerate() { if keep == Some(false) && removed < max_removals { // Flush pending batch before skipping this element From 0c4070c335d953c0688afee322ddad1b0719baba Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 26 Jan 2026 21:47:56 +0800 Subject: [PATCH 5/5] Apply suggestions --- .../functions-nested/benches/array_remove.rs | 1 + datafusion/functions-nested/src/remove.rs | 18 +++++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/datafusion/functions-nested/benches/array_remove.rs b/datafusion/functions-nested/benches/array_remove.rs index 0a8f297d18e82..2be154a2f89a8 100644 --- a/datafusion/functions-nested/benches/array_remove.rs +++ b/datafusion/functions-nested/benches/array_remove.rs @@ -42,6 +42,7 @@ const NULL_DENSITY: f64 = 0.1; fn criterion_benchmark(c: &mut Criterion) { // Test array_remove with different data types and array sizes + // TODO: Add performance tests for nested datatypes bench_array_remove_int64(c); bench_array_remove_f64(c); bench_array_remove_strings(c); diff --git a/datafusion/functions-nested/src/remove.rs b/datafusion/functions-nested/src/remove.rs index 1b9f67659a265..9e957c93e1c66 100644 --- a/datafusion/functions-nested/src/remove.rs +++ b/datafusion/functions-nested/src/remove.rs @@ -412,7 +412,7 @@ fn general_remove( let num_to_remove = eq_array.false_count(); // Fast path: no elements to remove, copy entire row - if false_count == 0 { + if num_to_remove == 0 { mutable.extend(0, start, end); offsets.push(offsets[row_index] + OffsetSize::usize_as(end - start)); valid.append_non_null(); @@ -420,27 +420,27 @@ fn general_remove( } // Remove at most `n` matching elements - let max_removals = n.min(false_count as i64); + let max_removals = n.min(num_to_remove as i64); let mut removed = 0i64; let mut copied = 0usize; - // marks the beginning of a range of elements pending to be copied. - let mut pending_batch_to_retain: Option = None; + // marks the beginning of a range of elements pending to be copied. + let mut pending_batch_to_retain: Option = None; for (i, keep) in eq_array.iter().enumerate() { if keep == Some(false) && removed < max_removals { // Flush pending batch before skipping this element - if let Some(bs) = batch_start { + if let Some(bs) = pending_batch_to_retain { mutable.extend(0, start + bs, start + i); copied += i - bs; - batch_start = None; + pending_batch_to_retain = None; } removed += 1; - } else if batch_start.is_none() { - batch_start = Some(i); + } else if pending_batch_to_retain.is_none() { + pending_batch_to_retain = Some(i); } } // Flush remaining batch - if let Some(bs) = batch_start { + if let Some(bs) = pending_batch_to_retain { mutable.extend(0, start + bs, start + eq_array.len()); copied += eq_array.len() - bs; }