Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e9009e2
feat: add minimal array concatenation support to concat function
EeshanBembi Oct 20, 2025
fb53f81
fix: address clippy warnings in concat function
EeshanBembi Oct 20, 2025
80b6330
feat: add array concatenation support to concat function
EeshanBembi Oct 20, 2025
540b7b2
refactor: simplify concat function implementation
EeshanBembi Oct 20, 2025
dea374f
fix: optimize concat function for cooperative execution
EeshanBembi Oct 23, 2025
1d3b483
fix: address remaining clippy warnings and update config docs
EeshanBembi Oct 23, 2025
8d7f321
fix: remove duplicate content from configs.md causing doc build warnings
EeshanBembi Oct 24, 2025
70c379d
docs: update concat function documentation to include array support
EeshanBembi Oct 24, 2025
ca980cd
fix: address reviewer feedback for concat array support
EeshanBembi Oct 26, 2025
ec13867
feat: make concat function array support production-ready
EeshanBembi Nov 12, 2025
75f2c0a
fix: complete array concatenation support with Spark compatibility
EeshanBembi Nov 12, 2025
e5513c0
docs: update concat function documentation with array support
EeshanBembi Nov 12, 2025
e20a48b
feat: delegate array concatenation from concat to array_concat logic
EeshanBembi Nov 20, 2025
aeb50d3
fix: remove accidentally committed temp target.csv files
EeshanBembi Nov 20, 2025
6194796
feat: delegate concat array operations to shared implementation
EeshanBembi Nov 24, 2025
131def3
fix: restore missing align_array_dimensions function and Arc imports
EeshanBembi Nov 24, 2025
0439c40
chore: remove duplicated align_array_dimensions function and unused i…
EeshanBembi Nov 24, 2025
e56ba13
fix: remove manually added join docs to allow CI auto-generation
EeshanBembi Nov 24, 2025
b94ac36
chore: fix prettier formatting in configs.md
EeshanBembi Nov 24, 2025
3f387e2
docs: update configuration documentation via update_config_docs.sh
EeshanBembi Nov 24, 2025
de7fc97
feat: Add concat array support addressing all review comments
EeshanBembi Jan 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 7 additions & 104 deletions datafusion/functions-nested/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ use std::any::Any;
use std::sync::Arc;

use crate::make_array::make_array_inner;
use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
use crate::utils::{check_datatypes, make_scalar_function};
use arrow::array::{
Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
NullBufferBuilder, OffsetSizeTrait,
Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, OffsetSizeTrait,
};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Field};
Expand Down Expand Up @@ -330,7 +329,7 @@ impl ScalarUDFImpl for ArrayConcat {
&self,
args: datafusion_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
make_scalar_function(array_concat_inner)(&args.args)
make_scalar_function(datafusion_functions::utils::concat_arrays)(&args.args)
}

fn aliases(&self) -> &[String] {
Expand All @@ -352,106 +351,9 @@ impl ScalarUDFImpl for ArrayConcat {
}
}

fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.is_empty() {
return exec_err!("array_concat expects at least one argument");
}

let mut all_null = true;
let mut large_list = false;
for arg in args {
match arg.data_type() {
DataType::Null => continue,
DataType::LargeList(_) => large_list = true,
_ => (),
}
if arg.null_count() < arg.len() {
all_null = false;
}
}

if all_null {
// Return a null array with the same type as the first non-null-type argument
let return_type = args
.iter()
.map(|arg| arg.data_type())
.find_or_first(|d| !d.is_null())
.unwrap(); // Safe because args is non-empty

Ok(arrow::array::make_array(ArrayData::new_null(
return_type,
args[0].len(),
)))
} else if large_list {
concat_internal::<i64>(args)
} else {
concat_internal::<i32>(args)
}
}

fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
let args = align_array_dimensions::<O>(args.to_vec())?;

let list_arrays = args
.iter()
.map(|arg| as_generic_list_array::<O>(arg))
.collect::<Result<Vec<_>>>()?;
// Assume number of rows is the same for all arrays
let row_count = list_arrays[0].len();

let mut array_lengths = vec![];
let mut arrays = vec![];
let mut valid = NullBufferBuilder::new(row_count);
for i in 0..row_count {
let nulls = list_arrays
.iter()
.map(|arr| arr.is_null(i))
.collect::<Vec<_>>();

// If all the arrays are null, the concatenated array is null
let is_null = nulls.iter().all(|&x| x);
if is_null {
array_lengths.push(0);
valid.append_null();
} else {
// Get all the arrays on i-th row
let values = list_arrays
.iter()
.map(|arr| arr.value(i))
.collect::<Vec<_>>();

let elements = values
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>();

// Concatenated array on i-th row
let concatenated_array = arrow::compute::concat(elements.as_slice())?;
array_lengths.push(concatenated_array.len());
arrays.push(concatenated_array);
valid.append_non_null();
}
}
// Assume all arrays have the same data type
let data_type = list_arrays[0].value_type();

let elements = arrays
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>();

let list_arr = GenericListArray::<O>::new(
Arc::new(Field::new_list_field(data_type, true)),
OffsetBuffer::from_lengths(array_lengths),
Arc::new(arrow::compute::concat(elements.as_slice())?),
valid.finish(),
);

Ok(Arc::new(list_arr))
}

// Kernel functions

/// Array_append SQL function
fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
let [array, values] = take_function_args("array_append", args)?;
match array.data_type() {
Expand All @@ -462,6 +364,7 @@ fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}

/// Array_prepend SQL function
fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
let [values, array] = take_function_args("array_prepend", args)?;
match array.data_type() {
Expand Down Expand Up @@ -492,8 +395,8 @@ where
};

let res = match list_array.value_type() {
DataType::List(_) => concat_internal::<O>(args)?,
DataType::LargeList(_) => concat_internal::<O>(args)?,
DataType::List(_) => datafusion_functions::utils::concat_arrays(args)?,
DataType::LargeList(_) => datafusion_functions::utils::concat_arrays(args)?,
data_type => {
return generic_append_and_prepend::<O>(
list_array,
Expand Down
104 changes: 2 additions & 102 deletions datafusion/functions-nested/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@

//! array function utils

use std::sync::Arc;
use arrow::datatypes::{DataType, Fields};

use arrow::datatypes::{DataType, Field, Fields};

use arrow::array::{
Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait, Scalar, UInt32Array,
};
use arrow::buffer::OffsetBuffer;
use arrow::array::{Array, ArrayRef, BooleanArray, Scalar, UInt32Array};
use datafusion_common::cast::{
as_fixed_size_list_array, as_large_list_array, as_list_array,
};
Expand Down Expand Up @@ -82,44 +77,6 @@ where
}
}

pub(crate) fn align_array_dimensions<O: OffsetSizeTrait>(
args: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>> {
let args_ndim = args
.iter()
.map(|arg| datafusion_common::utils::list_ndims(arg.data_type()))
.collect::<Vec<_>>();
let max_ndim = args_ndim.iter().max().unwrap_or(&0);

// Align the dimensions of the arrays
let aligned_args: Result<Vec<ArrayRef>> = args
.into_iter()
.zip(args_ndim.iter())
.map(|(array, ndim)| {
if ndim < max_ndim {
let mut aligned_array = Arc::clone(&array);
for _ in 0..(max_ndim - ndim) {
let data_type = aligned_array.data_type().to_owned();
let array_lengths = vec![1; aligned_array.len()];
let offsets = OffsetBuffer::<O>::from_lengths(array_lengths);

aligned_array = Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new_list_field(data_type, true)),
offsets,
aligned_array,
None,
)?)
}
Ok(aligned_array)
} else {
Ok(Arc::clone(&array))
}
})
.collect();

aligned_args
}

/// Computes a BooleanArray indicating equality or inequality between elements in a list array and a specified element array.
///
/// # Arguments
Expand Down Expand Up @@ -267,60 +224,3 @@ pub(crate) fn get_map_entry_field(data_type: &DataType) -> Result<&Fields> {
_ => internal_err!("Expected a Map type, got {data_type}"),
}
}

#[cfg(test)]
mod tests {
Comment thread
Jefffrey marked this conversation as resolved.
use super::*;
use arrow::array::ListArray;
use arrow::datatypes::Int64Type;
use datafusion_common::utils::SingleRowListArrayBuilder;

/// Only test internal functions, array-related sql functions will be tested in sqllogictest `array.slt`
#[test]
fn test_align_array_dimensions() {
let array1d_1: ArrayRef =
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1), Some(2), Some(3)]),
Some(vec![Some(4), Some(5)]),
]));
let array1d_2: ArrayRef =
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(6), Some(7), Some(8)]),
]));

let array2d_1: ArrayRef = Arc::new(
SingleRowListArrayBuilder::new(Arc::clone(&array1d_1)).build_list_array(),
);
let array2d_2 = Arc::new(
SingleRowListArrayBuilder::new(Arc::clone(&array1d_2)).build_list_array(),
);

let res = align_array_dimensions::<i32>(vec![
array1d_1.to_owned(),
array2d_2.to_owned(),
])
.unwrap();

let expected = as_list_array(&array2d_1).unwrap();
let expected_dim = datafusion_common::utils::list_ndims(array2d_1.data_type());
assert_ne!(as_list_array(&res[0]).unwrap(), expected);
assert_eq!(
datafusion_common::utils::list_ndims(res[0].data_type()),
expected_dim
);

let array3d_1: ArrayRef =
Arc::new(SingleRowListArrayBuilder::new(array2d_1).build_list_array());
let array3d_2: ArrayRef =
Arc::new(SingleRowListArrayBuilder::new(array2d_2).build_list_array());
let res = align_array_dimensions::<i32>(vec![array1d_1, array3d_2]).unwrap();

let expected = as_list_array(&array3d_1).unwrap();
let expected_dim = datafusion_common::utils::list_ndims(array3d_1.data_type());
assert_ne!(as_list_array(&res[0]).unwrap(), expected);
assert_eq!(
datafusion_common::utils::list_ndims(res[0].data_type()),
expected_dim
);
}
}
Loading