From bebc566ffb1cfa04d3f7db991fbc6f3165dc90b9 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 4 Jan 2024 16:30:52 +0800 Subject: [PATCH 01/14] support array_resize --- datafusion/common/src/utils.rs | 58 +++++++++++- datafusion/expr/src/built_in_function.rs | 9 ++ datafusion/expr/src/expr_fn.rs | 8 ++ .../physical-expr/src/array_expressions.rs | 94 ++++++++++++++++++- datafusion/physical-expr/src/functions.rs | 3 + datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 3 + datafusion/proto/src/generated/prost.rs | 3 + .../proto/src/logical_plan/from_proto.rs | 6 ++ datafusion/proto/src/logical_plan/to_proto.rs | 1 + datafusion/sqllogictest/test_files/array.slt | 24 +++++ 11 files changed, 207 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index 0a61fce15482d..b379fd8b2c8d0 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -25,7 +25,11 @@ use arrow::compute; use arrow::compute::{partition, SortColumn, SortOptions}; use arrow::datatypes::{Field, SchemaRef, UInt32Type}; use arrow::record_batch::RecordBatch; -use arrow_array::{Array, LargeListArray, ListArray, RecordBatchOptions}; +use arrow_array::{ + Array, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, + Int8Array, LargeListArray, ListArray, NullArray, RecordBatchOptions, UInt16Array, + UInt32Array, UInt64Array, UInt8Array, +}; use arrow_schema::DataType; use sqlparser::ast::Ident; use sqlparser::dialect::GenericDialect; @@ -492,6 +496,58 @@ pub fn list_ndims(data_type: &DataType) -> u64 { } } +/// Create an new empty array based on the given data type. +pub fn empty_list(data_type: &DataType) -> Result { + match data_type { + DataType::Boolean => Ok(Arc::new(BooleanArray::from(vec![None]))), + DataType::UInt8 => Ok(Arc::new(UInt8Array::from(vec![None]))), + DataType::UInt16 => Ok(Arc::new(UInt16Array::from(vec![None]))), + DataType::UInt32 => Ok(Arc::new(UInt32Array::from(vec![None]))), + DataType::UInt64 => Ok(Arc::new(UInt64Array::from(vec![None]))), + DataType::Int8 => Ok(Arc::new(Int8Array::from(vec![None]))), + DataType::Int16 => Ok(Arc::new(Int16Array::from(vec![None]))), + DataType::Int32 => Ok(Arc::new(Int32Array::from(vec![None]))), + DataType::Int64 => Ok(Arc::new(Int64Array::from(vec![None]))), + DataType::Float32 => Ok(Arc::new(Float32Array::from(vec![None]))), + DataType::Float64 => Ok(Arc::new(Float64Array::from(vec![None]))), + DataType::List(field) => { + let value = empty_list(field.data_type())?; + Ok(Arc::new(ListArray::try_new( + field.clone(), + OffsetBuffer::new(vec![0, 0].into()), + value, + None, + )?)) + } + DataType::LargeList(field) => { + let value = empty_list(field.data_type())?; + + Ok(Arc::new(LargeListArray::try_new( + field.clone(), + OffsetBuffer::new(vec![0, 0].into()), + value, + None, + )?)) + } + DataType::Null => Ok(Arc::new(NullArray::new(1))), + _ => _internal_err!("Unsupported data type for empty list: {:?}", data_type), + } +} + +// DataType::Boolean, +// DataType::UInt8, +// DataType::UInt16, +// DataType::UInt32, +// DataType::UInt64, +// DataType::Int8, +// DataType::Int16, +// DataType::Int32, +// DataType::Int64, +// DataType::Float32, +// DataType::Float64, +// DataType::Utf8, +// DataType::LargeUtf8, + /// An extension trait for smart pointers. Provides an interface to get a /// raw pointer to the data (with metadata stripped away). /// diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index e642dae06e4fd..6f64642f60d9b 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -187,6 +187,8 @@ pub enum BuiltinScalarFunction { ArrayExcept, /// cardinality Cardinality, + /// array_resize + ArrayResize, /// construct an array from columns MakeArray, /// Flatten @@ -430,6 +432,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayToString => Volatility::Immutable, BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable, BuiltinScalarFunction::ArrayUnion => Volatility::Immutable, + BuiltinScalarFunction::ArrayResize => Volatility::Immutable, BuiltinScalarFunction::Range => Volatility::Immutable, BuiltinScalarFunction::Cardinality => Volatility::Immutable, BuiltinScalarFunction::MakeArray => Volatility::Immutable, @@ -617,6 +620,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayReplaceN => Ok(input_expr_types[0].clone()), BuiltinScalarFunction::ArrayReplaceAll => Ok(input_expr_types[0].clone()), BuiltinScalarFunction::ArraySlice => Ok(input_expr_types[0].clone()), + BuiltinScalarFunction::ArrayResize => Ok(input_expr_types[0].clone()), BuiltinScalarFunction::ArrayToString => Ok(Utf8), BuiltinScalarFunction::ArrayIntersect => { match (input_expr_types[0].clone(), input_expr_types[1].clone()) { @@ -980,6 +984,10 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayIntersect => Signature::any(2, self.volatility()), BuiltinScalarFunction::ArrayUnion => Signature::any(2, self.volatility()), BuiltinScalarFunction::Cardinality => Signature::any(1, self.volatility()), + BuiltinScalarFunction::ArrayResize => { + Signature::variadic_any(self.volatility()) + } + BuiltinScalarFunction::Range => Signature::one_of( vec![ Exact(vec![Int64]), @@ -1647,6 +1655,7 @@ impl BuiltinScalarFunction { ], BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"], BuiltinScalarFunction::Cardinality => &["cardinality"], + BuiltinScalarFunction::ArrayResize => &["array_resize", "list_resize"], BuiltinScalarFunction::MakeArray => &["make_array", "make_list"], BuiltinScalarFunction::ArrayIntersect => { &["array_intersect", "list_intersect"] diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index cc8322272aa88..834420e413b0d 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -748,6 +748,14 @@ scalar_expr!( array, "returns the total number of elements in the array." ); + +scalar_expr!( + ArrayResize, + array_resize, + array size value, + "returns an array with the specified size filled with the given value." +); + nary_scalar_expr!( MakeArray, array, diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 9665116b04ab9..5d3d302546b28 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -18,6 +18,7 @@ //! Array expressions use std::any::type_name; +use std::cmp::min; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::sync::Arc; @@ -26,7 +27,7 @@ use arrow::array::*; use arrow::buffer::OffsetBuffer; use arrow::compute; use arrow::datatypes::{DataType, Field, UInt64Type}; -use arrow::row::{RowConverter, SortField}; +use arrow::row::{Row, RowConverter, SortField}; use arrow_buffer::NullBuffer; use arrow_schema::{FieldRef, SortOptions}; @@ -34,7 +35,7 @@ use datafusion_common::cast::{ as_generic_list_array, as_generic_string_array, as_int64_array, as_large_list_array, as_list_array, as_null_array, as_string_array, }; -use datafusion_common::utils::{array_into_list_array, list_ndims}; +use datafusion_common::utils::{array_into_list_array, empty_list, list_ndims}; use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result, }; @@ -2611,6 +2612,95 @@ pub fn array_distinct(args: &[ArrayRef]) -> Result { } } +pub fn array_resize(arg: &[ArrayRef]) -> Result { + if arg.len() < 2 || arg.len() > 3 { + return exec_err!("array_resize needs two or three arguments"); + } + + let array = as_list_array(&arg[0])?; + let new_len = as_int64_array(&arg[1])?.value(0); + if new_len < 0 { + return exec_err!("array_resize: new length must be non-negative"); + } + + let new_element = if arg.len() == 3 { + Some(arg[2].clone()) + } else { + None + }; + + match &arg[0].data_type() { + DataType::List(field) => { + general_list_resize::(array, new_len, field, new_element) + } + _ => internal_err!("array_resize only support list array"), + } +} + +fn general_list_resize( + array: &GenericListArray, + count_array: i64, + field: &FieldRef, + new_element: Option, +) -> Result { + let mut offsets = vec![0]; + let mut new_arrays = vec![]; + + let dt = array.value_type(); + let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; + let new_element = if let Some(new_element) = new_element { + new_element + } else { + empty_list(&dt)? + }; + let binding = converter.convert_columns(&[new_element.clone()])?; + let row = binding.row(0); + let count = count_array as usize; + + for arr in array.iter() { + match arr { + Some(arr) => { + let values = converter.convert_columns(&[arr])?; + let rows = values.iter().collect::>(); + + let remain_count = min(rows.len(), count); + let mut rows = rows[..remain_count].to_vec(); + let new_row = vec![&row; count - remain_count]; + rows.extend(new_row); + + let last_offset = offsets.last().copied().unwrap(); + offsets.push(last_offset + rows.len() as i32); + let arrays = converter.convert_rows(rows)?; + let array = match arrays.first() { + Some(array) => array.clone(), + None => { + return internal_err!( + "array_distinct: failed to get array from rows" + ) + } + }; + new_arrays.push(array); + } + None => { + let last_offset = offsets.last().copied().unwrap(); + offsets.push(last_offset + count as i32); + new_arrays.push(new_element.clone()); + } + } + } + + let offsets = OffsetBuffer::new(offsets.into()); + let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); + let values = compute::concat(&new_arrays_ref)?; + + Ok(Arc::new(ListArray::try_new( + field.clone(), + OffsetBuffer::from(offsets.into()), + values, + None, + )?)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 53de858439190..66e22d2302de2 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -419,6 +419,9 @@ pub fn create_physical_fun( BuiltinScalarFunction::Cardinality => { Arc::new(|args| make_scalar_function(array_expressions::cardinality)(args)) } + BuiltinScalarFunction::ArrayResize => { + Arc::new(|args| make_scalar_function(array_expressions::array_resize)(args)) + } BuiltinScalarFunction::MakeArray => { Arc::new(|args| make_scalar_function(array_expressions::make_array)(args)) } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index d5f8397aa30cf..001290b9a23d8 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -667,6 +667,7 @@ enum ScalarFunction { FindInSet = 127; ArraySort = 128; ArrayDistinct = 129; + ArrayResize = 130; } message ScalarFunctionNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 12e834d75adff..443120ce7324f 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22332,6 +22332,7 @@ impl serde::Serialize for ScalarFunction { Self::FindInSet => "FindInSet", Self::ArraySort => "ArraySort", Self::ArrayDistinct => "ArrayDistinct", + Self::ArrayResize => "ArrayResize", }; serializer.serialize_str(variant) } @@ -22473,6 +22474,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "FindInSet", "ArraySort", "ArrayDistinct", + "ArrayResize", ]; struct GeneratedVisitor; @@ -22643,6 +22645,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "FindInSet" => Ok(ScalarFunction::FindInSet), "ArraySort" => Ok(ScalarFunction::ArraySort), "ArrayDistinct" => Ok(ScalarFunction::ArrayDistinct), + "ArrayResize" => Ok(ScalarFunction::ArrayResize), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 4ee0b70325ca7..72367fc946d32 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2754,6 +2754,7 @@ pub enum ScalarFunction { FindInSet = 127, ArraySort = 128, ArrayDistinct = 129, + ArrayResize = 130, } impl ScalarFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -2892,6 +2893,7 @@ impl ScalarFunction { ScalarFunction::FindInSet => "FindInSet", ScalarFunction::ArraySort => "ArraySort", ScalarFunction::ArrayDistinct => "ArrayDistinct", + ScalarFunction::ArrayResize => "ArrayResize", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -3027,6 +3029,7 @@ impl ScalarFunction { "FindInSet" => Some(Self::FindInSet), "ArraySort" => Some(Self::ArraySort), "ArrayDistinct" => Some(Self::ArrayDistinct), + "ArrayResize" => Some(Self::ArrayResize), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 3f48be0c4d1f3..c11599412d94b 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -507,6 +507,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::ArrayToString => Self::ArrayToString, ScalarFunction::ArrayIntersect => Self::ArrayIntersect, ScalarFunction::ArrayUnion => Self::ArrayUnion, + ScalarFunction::ArrayResize => Self::ArrayResize, ScalarFunction::Range => Self::Range, ScalarFunction::Cardinality => Self::Cardinality, ScalarFunction::Array => Self::MakeArray, @@ -1499,6 +1500,11 @@ pub fn parse_expr( .map(|expr| parse_expr(expr, registry)) .collect::, _>>()?, )), + ScalarFunction::ArrayResize => Ok(array_slice( + parse_expr(&args[0], registry)?, + parse_expr(&args[1], registry)?, + parse_expr(&args[2], registry)?, + )), ScalarFunction::Sqrt => Ok(sqrt(parse_expr(&args[0], registry)?)), ScalarFunction::Cbrt => Ok(cbrt(parse_expr(&args[0], registry)?)), ScalarFunction::Sin => Ok(sin(parse_expr(&args[0], registry)?)), diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index b7d1ef225251d..ec9b886c1f222 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1485,6 +1485,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::ArrayPositions => Self::ArrayPositions, BuiltinScalarFunction::ArrayPrepend => Self::ArrayPrepend, BuiltinScalarFunction::ArrayRepeat => Self::ArrayRepeat, + BuiltinScalarFunction::ArrayResize => Self::ArrayResize, BuiltinScalarFunction::ArrayRemove => Self::ArrayRemove, BuiltinScalarFunction::ArrayRemoveN => Self::ArrayRemoveN, BuiltinScalarFunction::ArrayRemoveAll => Self::ArrayRemoveAll, diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index d864091a8588f..bca497c5efcdc 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -4894,6 +4894,30 @@ select string_to_list(e, 'm') from values; [adipiscing] NULL +# array_resize scalar function #1 +query ? +select array_resize(make_array(1, 2, 3), 1); +---- +[1] + +# array_resize scalar function #2 +query ? +select array_resize(make_array(1, 2, 3), 5); +---- +[1, 2, 3, , ] + +# array_resize scalar function #3 +query ? +select array_resize(make_array(1, 2, 3), 5, 4); +---- +[1, 2, 3, 4, 4] + +# TODO: support column later +# # array_resize scalar function #4 +# query ? +# select array_resize(column1, column2, column3) from arrays_values; +# -- + ### Delete tables statement ok From 5bfee70709e45fe5c6e101f743418b81ad9b0557 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 4 Jan 2024 16:59:23 +0800 Subject: [PATCH 02/14] support column --- .../physical-expr/src/array_expressions.rs | 36 +++++++++++-------- datafusion/sqllogictest/test_files/array.slt | 27 +++++++++++--- 2 files changed, 44 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 5d3d302546b28..eb51abd558f73 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -27,8 +27,8 @@ use arrow::array::*; use arrow::buffer::OffsetBuffer; use arrow::compute; use arrow::datatypes::{DataType, Field, UInt64Type}; -use arrow::row::{Row, RowConverter, SortField}; -use arrow_buffer::NullBuffer; +use arrow::row::{RowConverter, SortField}; +use arrow_buffer::{ArrowNativeType, NullBuffer}; use arrow_schema::{FieldRef, SortOptions}; use datafusion_common::cast::{ @@ -2618,10 +2618,10 @@ pub fn array_resize(arg: &[ArrayRef]) -> Result { } let array = as_list_array(&arg[0])?; - let new_len = as_int64_array(&arg[1])?.value(0); - if new_len < 0 { - return exec_err!("array_resize: new length must be non-negative"); - } + let new_len = as_int64_array(&arg[1])?; + // if new_len < 0 { + // return exec_err!("array_resize: new length must be non-negative"); + // } let new_element = if arg.len() == 3 { Some(arg[2].clone()) @@ -2639,7 +2639,7 @@ pub fn array_resize(arg: &[ArrayRef]) -> Result { fn general_list_resize( array: &GenericListArray, - count_array: i64, + count_array: &Int64Array, field: &FieldRef, new_element: Option, ) -> Result { @@ -2653,11 +2653,19 @@ fn general_list_resize( } else { empty_list(&dt)? }; - let binding = converter.convert_columns(&[new_element.clone()])?; - let row = binding.row(0); - let count = count_array as usize; + let rows = converter.convert_columns(&[new_element.clone()])?; - for arr in array.iter() { + for (index, arr) in array.iter().enumerate() { + let count = count_array.value(index).to_usize().ok_or_else(|| { + DataFusionError::Execution( + "array_resize: failed to convert size to usize".to_string(), + ) + })?; + let row = if rows.size() > 0 { + rows.row(index) + } else { + rows.row(0) + }; match arr { Some(arr) => { let values = converter.convert_columns(&[arr])?; @@ -2675,7 +2683,7 @@ fn general_list_resize( Some(array) => array.clone(), None => { return internal_err!( - "array_distinct: failed to get array from rows" + "array_resize: failed to get array from rows" ) } }; @@ -2683,7 +2691,7 @@ fn general_list_resize( } None => { let last_offset = offsets.last().copied().unwrap(); - offsets.push(last_offset + count as i32); + offsets.push(last_offset); new_arrays.push(new_element.clone()); } } @@ -2695,7 +2703,7 @@ fn general_list_resize( Ok(Arc::new(ListArray::try_new( field.clone(), - OffsetBuffer::from(offsets.into()), + offsets, values, None, )?)) diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index bca497c5efcdc..eaa010cabcdbe 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -4912,11 +4912,28 @@ select array_resize(make_array(1, 2, 3), 5, 4); ---- [1, 2, 3, 4, 4] -# TODO: support column later -# # array_resize scalar function #4 -# query ? -# select array_resize(column1, column2, column3) from arrays_values; -# -- +# array_resize scalar function #4 +query error +select array_resize(make_array(1, 2, 3), -5, 2); + +# array_resize scalar function #5 +query ? +select array_resize(make_array(1.1, 2.2, 3.3), 10, 9.9); +---- +[1.1, 2.2, 3.3, 9.9, 9.9, 9.9, 9.9, 9.9, 9.9, 9.9] + +# array_resize scalar function #5 +query ? +select array_resize(column1, column2, column3) from arrays_values; +---- +[] +[11, 12, 13, 14, 15, 16, 17, 18, , 20, 2, 2] +[21, 22, 23, , 25, 26, 27, 28, 29, 30, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3] +[31, 32, 33, 34, 35, , 37, 38, 39, 40, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4] +[] +[] +[1, 2, 3, 4, 5, 6, , 7, 51, 52, , 54, 55, 56, 57, 58, 59, 60, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ] +[, , , , , , , , 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7] ### Delete tables From 0cb9160c15019985335fad59a81bdf6751edd913 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 4 Jan 2024 17:51:04 +0800 Subject: [PATCH 03/14] support LargeList --- .../physical-expr/src/array_expressions.rs | 16 +++---- datafusion/sqllogictest/test_files/array.slt | 43 +++++++++++++++++++ 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index eb51abd558f73..5ccd009a102bf 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -2617,12 +2617,7 @@ pub fn array_resize(arg: &[ArrayRef]) -> Result { return exec_err!("array_resize needs two or three arguments"); } - let array = as_list_array(&arg[0])?; let new_len = as_int64_array(&arg[1])?; - // if new_len < 0 { - // return exec_err!("array_resize: new length must be non-negative"); - // } - let new_element = if arg.len() == 3 { Some(arg[2].clone()) } else { @@ -2631,8 +2626,13 @@ pub fn array_resize(arg: &[ArrayRef]) -> Result { match &arg[0].data_type() { DataType::List(field) => { + let array = as_list_array(&arg[0])?; general_list_resize::(array, new_len, field, new_element) } + DataType::LargeList(field) => { + let array = as_large_list_array(&arg[0])?; + general_list_resize::(array, new_len, field, new_element) + } _ => internal_err!("array_resize only support list array"), } } @@ -2643,7 +2643,7 @@ fn general_list_resize( field: &FieldRef, new_element: Option, ) -> Result { - let mut offsets = vec![0]; + let mut offsets = vec![O::usize_as(0)]; let mut new_arrays = vec![]; let dt = array.value_type(); @@ -2677,7 +2677,7 @@ fn general_list_resize( rows.extend(new_row); let last_offset = offsets.last().copied().unwrap(); - offsets.push(last_offset + rows.len() as i32); + offsets.push(last_offset + O::usize_as(rows.len())); let arrays = converter.convert_rows(rows)?; let array = match arrays.first() { Some(array) => array.clone(), @@ -2701,7 +2701,7 @@ fn general_list_resize( let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); let values = compute::concat(&new_arrays_ref)?; - Ok(Arc::new(ListArray::try_new( + Ok(Arc::new(GenericListArray::::try_new( field.clone(), offsets, values, diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index eaa010cabcdbe..2fe3bec5529af 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -4900,18 +4900,33 @@ select array_resize(make_array(1, 2, 3), 1); ---- [1] +query ? +select array_resize(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'), 1); +---- +[1] + # array_resize scalar function #2 query ? select array_resize(make_array(1, 2, 3), 5); ---- [1, 2, 3, , ] +query ? +select array_resize(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'), 5); +---- +[1, 2, 3, , ] + # array_resize scalar function #3 query ? select array_resize(make_array(1, 2, 3), 5, 4); ---- [1, 2, 3, 4, 4] +query ? +select array_resize(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'), 5, 4); +---- +[1, 2, 3, 4, 4] + # array_resize scalar function #4 query error select array_resize(make_array(1, 2, 3), -5, 2); @@ -4922,6 +4937,11 @@ select array_resize(make_array(1.1, 2.2, 3.3), 10, 9.9); ---- [1.1, 2.2, 3.3, 9.9, 9.9, 9.9, 9.9, 9.9, 9.9, 9.9] +query ? +select array_resize(arrow_cast(make_array(1.1, 2.2, 3.3), 'LargeList(Float64)'), 10, 9.9); +---- +[1.1, 2.2, 3.3, 9.9, 9.9, 9.9, 9.9, 9.9, 9.9, 9.9] + # array_resize scalar function #5 query ? select array_resize(column1, column2, column3) from arrays_values; @@ -4935,6 +4955,29 @@ select array_resize(column1, column2, column3) from arrays_values; [1, 2, 3, 4, 5, 6, , 7, 51, 52, , 54, 55, 56, 57, 58, 59, 60, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ] [, , , , , , , , 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7] +query ? +select array_resize(arrow_cast(column1, 'LargeList(Int64)'), column2, column3) from arrays_values; +---- +[] +[11, 12, 13, 14, 15, 16, 17, 18, , 20, 2, 2] +[21, 22, 23, , 25, 26, 27, 28, 29, 30, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3] +[31, 32, 33, 34, 35, , 37, 38, 39, 40, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4] +[] +[] +[1, 2, 3, 4, 5, 6, , 7, 51, 52, , 54, 55, 56, 57, 58, 59, 60, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ] +[, , , , , , , , 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7] + +# array_resize scalar function #5 +query ? +select array_resize([[1], [2], [3]], 10, [5]); +---- +[[1], [2], [3], [5], [5], [5], [5], [5], [5], [5]] + +query ? +select array_resize(arrow_cast([[1], [2], [3]], 'LargeList(List(Int64))'), 10, [5]); +---- +[[1], [2], [3], [5], [5], [5], [5], [5], [5], [5]] + ### Delete tables statement ok From e75ced5434a1c59f2f4efde11ce61800fa0a68cf Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 4 Jan 2024 18:10:09 +0800 Subject: [PATCH 04/14] add function discription --- docs/source/user-guide/expressions.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index b8689e5567415..3814b289bdd38 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -237,6 +237,7 @@ Unlike to some databases the math functions in Datafusion works the same way as | array_intersect(array1, array2) | Returns an array of the elements in the intersection of array1 and array2. `array_intersect([1, 2, 3, 4], [5, 6, 3, 4]) -> [3, 4]` | | array_union(array1, array2) | Returns an array of the elements in the union of array1 and array2 without duplicates. `array_union([1, 2, 3, 4], [5, 6, 3, 4]) -> [1, 2, 3, 4, 5, 6]` | | array_except(array1, array2) | Returns an array of the elements that appear in the first array but not in the second. `array_except([1, 2, 3, 4], [5, 6, 3, 4]) -> [3, 4]` | +| array_resize(array, size, value) | Resizes the list to contain size elements. Initializes new elements with value or empty if value is not set. | cardinality(array) | Returns the total number of elements in the array. `cardinality([[1, 2, 3], [4, 5, 6]]) -> 6` | | make_array(value1, [value2 [, ...]]) | Returns an Arrow array using the specified input expressions. `make_array(1, 2, 3) -> [1, 2, 3]` | | range(start [, stop, step]) | Returns an Arrow array between start and stop with step. `SELECT range(2, 10, 3) -> [2, 5, 8]` | From 9e4695d71fb8a18dc93acefede82f2c465908696 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 4 Jan 2024 18:18:54 +0800 Subject: [PATCH 05/14] add example --- docs/source/user-guide/expressions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 3814b289bdd38..082998f694398 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -237,7 +237,7 @@ Unlike to some databases the math functions in Datafusion works the same way as | array_intersect(array1, array2) | Returns an array of the elements in the intersection of array1 and array2. `array_intersect([1, 2, 3, 4], [5, 6, 3, 4]) -> [3, 4]` | | array_union(array1, array2) | Returns an array of the elements in the union of array1 and array2 without duplicates. `array_union([1, 2, 3, 4], [5, 6, 3, 4]) -> [1, 2, 3, 4, 5, 6]` | | array_except(array1, array2) | Returns an array of the elements that appear in the first array but not in the second. `array_except([1, 2, 3, 4], [5, 6, 3, 4]) -> [3, 4]` | -| array_resize(array, size, value) | Resizes the list to contain size elements. Initializes new elements with value or empty if value is not set. +| array_resize(array, size, value) | Resizes the list to contain size elements. Initializes new elements with value or empty if value is not set. `array_resize([1, 2, 3], 5, 0) -> [1, 2, 3, 4, 5, 6]` | | cardinality(array) | Returns the total number of elements in the array. `cardinality([[1, 2, 3], [4, 5, 6]]) -> 6` | | make_array(value1, [value2 [, ...]]) | Returns an Arrow array using the specified input expressions. `make_array(1, 2, 3) -> [1, 2, 3]` | | range(start [, stop, step]) | Returns an Arrow array between start and stop with step. `SELECT range(2, 10, 3) -> [2, 5, 8]` | From 0bcffcafc6e4124db7f483ed56209d01996f8681 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 4 Jan 2024 18:30:28 +0800 Subject: [PATCH 06/14] fix ci --- docs/source/user-guide/expressions.md | 2 +- node_modules/.package-lock.json | 6 ++++++ package-lock.json | 6 ++++++ package.json | 1 + 4 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 node_modules/.package-lock.json create mode 100644 package-lock.json create mode 100644 package.json diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 082998f694398..85322d9fa7669 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -237,7 +237,7 @@ Unlike to some databases the math functions in Datafusion works the same way as | array_intersect(array1, array2) | Returns an array of the elements in the intersection of array1 and array2. `array_intersect([1, 2, 3, 4], [5, 6, 3, 4]) -> [3, 4]` | | array_union(array1, array2) | Returns an array of the elements in the union of array1 and array2 without duplicates. `array_union([1, 2, 3, 4], [5, 6, 3, 4]) -> [1, 2, 3, 4, 5, 6]` | | array_except(array1, array2) | Returns an array of the elements that appear in the first array but not in the second. `array_except([1, 2, 3, 4], [5, 6, 3, 4]) -> [3, 4]` | -| array_resize(array, size, value) | Resizes the list to contain size elements. Initializes new elements with value or empty if value is not set. `array_resize([1, 2, 3], 5, 0) -> [1, 2, 3, 4, 5, 6]` | +| array_resize(array, size, value) | Resizes the list to contain size elements. Initializes new elements with value or empty if value is not set. `array_resize([1, 2, 3], 5, 0) -> [1, 2, 3, 4, 5, 6]` | | cardinality(array) | Returns the total number of elements in the array. `cardinality([[1, 2, 3], [4, 5, 6]]) -> 6` | | make_array(value1, [value2 [, ...]]) | Returns an Arrow array using the specified input expressions. `make_array(1, 2, 3) -> [1, 2, 3]` | | range(start [, stop, step]) | Returns an Arrow array between start and stop with step. `SELECT range(2, 10, 3) -> [2, 5, 8]` | diff --git a/node_modules/.package-lock.json b/node_modules/.package-lock.json new file mode 100644 index 0000000000000..10e1d93ef64c8 --- /dev/null +++ b/node_modules/.package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "arrow-datafusion", + "lockfileVersion": 3, + "requires": true, + "packages": {} +} diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000000000..10e1d93ef64c8 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "arrow-datafusion", + "lockfileVersion": 3, + "requires": true, + "packages": {} +} diff --git a/package.json b/package.json new file mode 100644 index 0000000000000..0967ef424bce6 --- /dev/null +++ b/package.json @@ -0,0 +1 @@ +{} From 24c15691a1f6801008ad928c4100acab148a2774 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 4 Jan 2024 20:31:03 +0800 Subject: [PATCH 07/14] remove useless files --- node_modules/.package-lock.json | 6 ------ package-lock.json | 6 ------ package.json | 1 - 3 files changed, 13 deletions(-) delete mode 100644 node_modules/.package-lock.json delete mode 100644 package-lock.json delete mode 100644 package.json diff --git a/node_modules/.package-lock.json b/node_modules/.package-lock.json deleted file mode 100644 index 10e1d93ef64c8..0000000000000 --- a/node_modules/.package-lock.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name": "arrow-datafusion", - "lockfileVersion": 3, - "requires": true, - "packages": {} -} diff --git a/package-lock.json b/package-lock.json deleted file mode 100644 index 10e1d93ef64c8..0000000000000 --- a/package-lock.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name": "arrow-datafusion", - "lockfileVersion": 3, - "requires": true, - "packages": {} -} diff --git a/package.json b/package.json deleted file mode 100644 index 0967ef424bce6..0000000000000 --- a/package.json +++ /dev/null @@ -1 +0,0 @@ -{} From 783936610647198b9051119b677d75fd56ce8a8a Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 5 Jan 2024 12:14:45 +0800 Subject: [PATCH 08/14] rename variable and improve error --- .../physical-expr/src/array_expressions.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 5ccd009a102bf..300ee94dc005e 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -37,7 +37,8 @@ use datafusion_common::cast::{ }; use datafusion_common::utils::{array_into_list_array, empty_list, list_ndims}; use datafusion_common::{ - exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result, + exec_datafusion_err, exec_err, internal_err, not_impl_err, plan_err, DataFusionError, + Result, }; use itertools::Itertools; @@ -2633,7 +2634,7 @@ pub fn array_resize(arg: &[ArrayRef]) -> Result { let array = as_large_list_array(&arg[0])?; general_list_resize::(array, new_len, field, new_element) } - _ => internal_err!("array_resize only support list array"), + _ => exec_err!("array_resize only support list array"), } } @@ -2641,25 +2642,23 @@ fn general_list_resize( array: &GenericListArray, count_array: &Int64Array, field: &FieldRef, - new_element: Option, + default_element: Option, ) -> Result { let mut offsets = vec![O::usize_as(0)]; let mut new_arrays = vec![]; let dt = array.value_type(); let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; - let new_element = if let Some(new_element) = new_element { - new_element + let default_element = if let Some(default_element) = default_element { + default_element } else { empty_list(&dt)? }; - let rows = converter.convert_columns(&[new_element.clone()])?; + let rows = converter.convert_columns(&[default_element.clone()])?; for (index, arr) in array.iter().enumerate() { let count = count_array.value(index).to_usize().ok_or_else(|| { - DataFusionError::Execution( - "array_resize: failed to convert size to usize".to_string(), - ) + exec_datafusion_err!("array_resize: failed to convert size to usize") })?; let row = if rows.size() > 0 { rows.row(index) @@ -2692,7 +2691,7 @@ fn general_list_resize( None => { let last_offset = offsets.last().copied().unwrap(); offsets.push(last_offset); - new_arrays.push(new_element.clone()); + new_arrays.push(default_element.clone()); } } } From a1e253b1644f8e010cc2cde70436c6bb434be83d Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 5 Jan 2024 13:55:36 +0800 Subject: [PATCH 09/14] clean comment --- datafusion/common/src/utils.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index b379fd8b2c8d0..52b149fe509ed 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -534,20 +534,6 @@ pub fn empty_list(data_type: &DataType) -> Result { } } -// DataType::Boolean, -// DataType::UInt8, -// DataType::UInt16, -// DataType::UInt32, -// DataType::UInt64, -// DataType::Int8, -// DataType::Int16, -// DataType::Int32, -// DataType::Int64, -// DataType::Float32, -// DataType::Float64, -// DataType::Utf8, -// DataType::LargeUtf8, - /// An extension trait for smart pointers. Provides an interface to get a /// raw pointer to the data (with metadata stripped away). /// From bce6b357f3b15c55c16c9a7e2e1a6d8b88c4a591 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 5 Jan 2024 13:59:54 +0800 Subject: [PATCH 10/14] rename variable --- datafusion/physical-expr/src/array_expressions.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 300ee94dc005e..199d9bfd33b4b 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -2660,7 +2660,7 @@ fn general_list_resize( let count = count_array.value(index).to_usize().ok_or_else(|| { exec_datafusion_err!("array_resize: failed to convert size to usize") })?; - let row = if rows.size() > 0 { + let default_value = if rows.size() > 0 { rows.row(index) } else { rows.row(0) @@ -2672,8 +2672,8 @@ fn general_list_resize( let remain_count = min(rows.len(), count); let mut rows = rows[..remain_count].to_vec(); - let new_row = vec![&row; count - remain_count]; - rows.extend(new_row); + let new_rows = vec![&default_value; count - remain_count]; + rows.extend(new_rows); let last_offset = offsets.last().copied().unwrap(); offsets.push(last_offset + O::usize_as(rows.len())); From f83258b63bb47194cc65cb7ac47404adbc4e385c Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 8 Jan 2024 13:21:42 +0800 Subject: [PATCH 11/14] improve error output --- datafusion/physical-expr/src/array_expressions.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 199d9bfd33b4b..d1f9e813a286a 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -2634,7 +2634,7 @@ pub fn array_resize(arg: &[ArrayRef]) -> Result { let array = as_large_list_array(&arg[0])?; general_list_resize::(array, new_len, field, new_element) } - _ => exec_err!("array_resize only support list array"), + array_type => exec_err!("array_resize does not support type '{array_type:?}'."), } } From cba3269a404d8f2397355598673f58598e5ae6f7 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 10 Jan 2024 14:14:03 +0800 Subject: [PATCH 12/14] use MutableArray --- datafusion/common/src/utils.rs | 44 +------ .../physical-expr/src/array_expressions.rs | 109 +++++++++--------- datafusion/sqllogictest/test_files/array.slt | 12 +- 3 files changed, 61 insertions(+), 104 deletions(-) diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index 52b149fe509ed..0a61fce15482d 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -25,11 +25,7 @@ use arrow::compute; use arrow::compute::{partition, SortColumn, SortOptions}; use arrow::datatypes::{Field, SchemaRef, UInt32Type}; use arrow::record_batch::RecordBatch; -use arrow_array::{ - Array, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, - Int8Array, LargeListArray, ListArray, NullArray, RecordBatchOptions, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, -}; +use arrow_array::{Array, LargeListArray, ListArray, RecordBatchOptions}; use arrow_schema::DataType; use sqlparser::ast::Ident; use sqlparser::dialect::GenericDialect; @@ -496,44 +492,6 @@ pub fn list_ndims(data_type: &DataType) -> u64 { } } -/// Create an new empty array based on the given data type. -pub fn empty_list(data_type: &DataType) -> Result { - match data_type { - DataType::Boolean => Ok(Arc::new(BooleanArray::from(vec![None]))), - DataType::UInt8 => Ok(Arc::new(UInt8Array::from(vec![None]))), - DataType::UInt16 => Ok(Arc::new(UInt16Array::from(vec![None]))), - DataType::UInt32 => Ok(Arc::new(UInt32Array::from(vec![None]))), - DataType::UInt64 => Ok(Arc::new(UInt64Array::from(vec![None]))), - DataType::Int8 => Ok(Arc::new(Int8Array::from(vec![None]))), - DataType::Int16 => Ok(Arc::new(Int16Array::from(vec![None]))), - DataType::Int32 => Ok(Arc::new(Int32Array::from(vec![None]))), - DataType::Int64 => Ok(Arc::new(Int64Array::from(vec![None]))), - DataType::Float32 => Ok(Arc::new(Float32Array::from(vec![None]))), - DataType::Float64 => Ok(Arc::new(Float64Array::from(vec![None]))), - DataType::List(field) => { - let value = empty_list(field.data_type())?; - Ok(Arc::new(ListArray::try_new( - field.clone(), - OffsetBuffer::new(vec![0, 0].into()), - value, - None, - )?)) - } - DataType::LargeList(field) => { - let value = empty_list(field.data_type())?; - - Ok(Arc::new(LargeListArray::try_new( - field.clone(), - OffsetBuffer::new(vec![0, 0].into()), - value, - None, - )?)) - } - DataType::Null => Ok(Arc::new(NullArray::new(1))), - _ => _internal_err!("Unsupported data type for empty list: {:?}", data_type), - } -} - /// An extension trait for smart pointers. Provides an interface to get a /// raw pointer to the data (with metadata stripped away). /// diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index d1f9e813a286a..e422d6980a835 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -18,7 +18,6 @@ //! Array expressions use std::any::type_name; -use std::cmp::min; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::sync::Arc; @@ -35,10 +34,10 @@ use datafusion_common::cast::{ as_generic_list_array, as_generic_string_array, as_int64_array, as_large_list_array, as_list_array, as_null_array, as_string_array, }; -use datafusion_common::utils::{array_into_list_array, empty_list, list_ndims}; +use datafusion_common::utils::{array_into_list_array, list_ndims}; use datafusion_common::{ exec_datafusion_err, exec_err, internal_err, not_impl_err, plan_err, DataFusionError, - Result, + Result, ScalarValue, }; use itertools::Itertools; @@ -1192,7 +1191,10 @@ pub fn array_concat(args: &[ArrayRef]) -> Result { } } - concat_internal::(new_args.as_slice()) + match &args[0].data_type() { + DataType::LargeList(_) => concat_internal::(new_args.as_slice()), + _ => concat_internal::(new_args.as_slice()), + } } /// Array_empty SQL function @@ -1241,7 +1243,7 @@ pub fn array_repeat(args: &[ArrayRef]) -> Result { let list_array = as_large_list_array(element)?; general_list_repeat::(list_array, count_array) } - _ => general_repeat(element, count_array), + _ => general_repeat::(element, count_array), } } @@ -1257,7 +1259,10 @@ pub fn array_repeat(args: &[ArrayRef]) -> Result { /// [1, 2, 3], [2, 0, 1] => [[1, 1], [], [3]] /// ) /// ``` -fn general_repeat(array: &ArrayRef, count_array: &Int64Array) -> Result { +fn general_repeat( + array: &ArrayRef, + count_array: &Int64Array, +) -> Result { let data_type = array.data_type(); let mut new_values = vec![]; @@ -1290,7 +1295,7 @@ fn general_repeat(array: &ArrayRef, count_array: &Int64Array) -> Result = new_values.iter().map(|a| a.as_ref()).collect(); let values = compute::concat(&new_values)?; - Ok(Arc::new(ListArray::try_new( + Ok(Arc::new(GenericListArray::::try_new( Arc::new(Field::new("item", data_type.to_owned(), true)), OffsetBuffer::from_lengths(count_vec), values, @@ -2613,6 +2618,7 @@ pub fn array_distinct(args: &[ArrayRef]) -> Result { } } +/// array_resize SQL function pub fn array_resize(arg: &[ArrayRef]) -> Result { if arg.len() < 2 || arg.len() > 3 { return exec_err!("array_resize needs two or three arguments"); @@ -2638,74 +2644,67 @@ pub fn array_resize(arg: &[ArrayRef]) -> Result { } } +/// array_resize keep the original array and append the default element to the end fn general_list_resize( array: &GenericListArray, count_array: &Int64Array, field: &FieldRef, default_element: Option, -) -> Result { - let mut offsets = vec![O::usize_as(0)]; - let mut new_arrays = vec![]; +) -> Result +where + O: TryInto, +{ + let mut default_element_array = vec![]; - let dt = array.value_type(); - let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; + let data_type = array.value_type(); let default_element = if let Some(default_element) = default_element { default_element } else { - empty_list(&dt)? + let null_scalar = ScalarValue::try_from(&data_type)?; + null_scalar.to_array_of_size(1)? }; - let rows = converter.convert_columns(&[default_element.clone()])?; - for (index, arr) in array.iter().enumerate() { - let count = count_array.value(index).to_usize().ok_or_else(|| { + // create a mutable array to store the original data + let values = array.values(); + let original_data = values.to_data(); + let capacity = Capacities::Array(original_data.len()); + let mut offsets = vec![O::usize_as(0)]; + let mut mutable = + MutableArrayData::with_capacities(vec![&original_data], false, capacity); + + for (row_index, offset_window) in array.offsets().windows(2).enumerate() { + let count = count_array.value(row_index).to_usize().ok_or_else(|| { exec_datafusion_err!("array_resize: failed to convert size to usize") })?; - let default_value = if rows.size() > 0 { - rows.row(index) + let count = O::usize_as(count); + let start = offset_window[0]; + let end = if start + count > offset_window[1] { + let value = (start + count - offset_window[1]).try_into().map_err(|_| { + exec_datafusion_err!("array_resize: failed to convert size to i64") + })?; + default_element_array.push(Some(value)); + offset_window[1] } else { - rows.row(0) + default_element_array.push(None); + start + count }; - match arr { - Some(arr) => { - let values = converter.convert_columns(&[arr])?; - let rows = values.iter().collect::>(); - - let remain_count = min(rows.len(), count); - let mut rows = rows[..remain_count].to_vec(); - let new_rows = vec![&default_value; count - remain_count]; - rows.extend(new_rows); - - let last_offset = offsets.last().copied().unwrap(); - offsets.push(last_offset + O::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => array.clone(), - None => { - return internal_err!( - "array_resize: failed to get array from rows" - ) - } - }; - new_arrays.push(array); - } - None => { - let last_offset = offsets.last().copied().unwrap(); - offsets.push(last_offset); - new_arrays.push(default_element.clone()); - } - } + mutable.extend(0, (start).to_usize().unwrap(), (end).to_usize().unwrap()); + offsets.push(offsets[row_index] + end - start); } - let offsets = OffsetBuffer::new(offsets.into()); - let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); - let values = compute::concat(&new_arrays_ref)?; + let default_element_array = Arc::new(Int64Array::from(default_element_array)); + let default_element_array = + general_repeat::(&default_element, &default_element_array)?; - Ok(Arc::new(GenericListArray::::try_new( + let data = mutable.freeze(); + let original_part = Arc::new(GenericListArray::::try_new( field.clone(), - offsets, - values, + OffsetBuffer::::new(offsets.into()), + arrow_array::make_array(data), None, - )?)) + )?); + + array_concat(&[original_part, default_element_array]) } #[cfg(test)] diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 2fe3bec5529af..822c4bcf60b6c 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -4950,10 +4950,10 @@ select array_resize(column1, column2, column3) from arrays_values; [11, 12, 13, 14, 15, 16, 17, 18, , 20, 2, 2] [21, 22, 23, , 25, 26, 27, 28, 29, 30, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3] [31, 32, 33, 34, 35, , 37, 38, 39, 40, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4] +[5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5] [] -[] -[1, 2, 3, 4, 5, 6, , 7, 51, 52, , 54, 55, 56, 57, 58, 59, 60, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ] -[, , , , , , , , 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7] +[51, 52, , 54, 55, 56, 57, 58, 59, 60, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ] +[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7] query ? select array_resize(arrow_cast(column1, 'LargeList(Int64)'), column2, column3) from arrays_values; @@ -4962,10 +4962,10 @@ select array_resize(arrow_cast(column1, 'LargeList(Int64)'), column2, column3) f [11, 12, 13, 14, 15, 16, 17, 18, , 20, 2, 2] [21, 22, 23, , 25, 26, 27, 28, 29, 30, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3] [31, 32, 33, 34, 35, , 37, 38, 39, 40, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4] +[5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5] [] -[] -[1, 2, 3, 4, 5, 6, , 7, 51, 52, , 54, 55, 56, 57, 58, 59, 60, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ] -[, , , , , , , , 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7] +[51, 52, , 54, 55, 56, 57, 58, 59, 60, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ] +[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7] # array_resize scalar function #5 query ? From 6e50702f6773005686b49e1a70afbd0502c25acf Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 10 Jan 2024 16:37:05 +0800 Subject: [PATCH 13/14] refactor code and reduce extra function calls --- .../physical-expr/src/array_expressions.rs | 55 ++++++++++--------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index e422d6980a835..f91a2f6bc3bbf 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -2654,23 +2654,28 @@ fn general_list_resize( where O: TryInto, { - let mut default_element_array = vec![]; - let data_type = array.value_type(); + + let values = array.values(); + let original_data = values.to_data(); + + // create default element array let default_element = if let Some(default_element) = default_element { default_element } else { let null_scalar = ScalarValue::try_from(&data_type)?; - null_scalar.to_array_of_size(1)? + null_scalar.to_array_of_size(original_data.len())? }; + let default_value_data = default_element.to_data(); // create a mutable array to store the original data - let values = array.values(); - let original_data = values.to_data(); - let capacity = Capacities::Array(original_data.len()); + let capacity = Capacities::Array(original_data.len() + default_value_data.len()); let mut offsets = vec![O::usize_as(0)]; - let mut mutable = - MutableArrayData::with_capacities(vec![&original_data], false, capacity); + let mut mutable = MutableArrayData::with_capacities( + vec![&original_data, &default_value_data], + false, + capacity, + ); for (row_index, offset_window) in array.offsets().windows(2).enumerate() { let count = count_array.value(row_index).to_usize().ok_or_else(|| { @@ -2678,33 +2683,31 @@ where })?; let count = O::usize_as(count); let start = offset_window[0]; - let end = if start + count > offset_window[1] { - let value = (start + count - offset_window[1]).try_into().map_err(|_| { - exec_datafusion_err!("array_resize: failed to convert size to i64") - })?; - default_element_array.push(Some(value)); - offset_window[1] + if start + count > offset_window[1] { + let extra_count = + (start + count - offset_window[1]).try_into().map_err(|_| { + exec_datafusion_err!("array_resize: failed to convert size to i64") + })?; + let end = offset_window[1]; + mutable.extend(0, (start).to_usize().unwrap(), (end).to_usize().unwrap()); + // append default element + for _ in 0..extra_count { + mutable.extend(1, row_index, row_index + 1); + } } else { - default_element_array.push(None); - start + count + let end = start + count; + mutable.extend(0, (start).to_usize().unwrap(), (end).to_usize().unwrap()); }; - mutable.extend(0, (start).to_usize().unwrap(), (end).to_usize().unwrap()); - offsets.push(offsets[row_index] + end - start); + offsets.push(offsets[row_index] + count); } - let default_element_array = Arc::new(Int64Array::from(default_element_array)); - let default_element_array = - general_repeat::(&default_element, &default_element_array)?; - let data = mutable.freeze(); - let original_part = Arc::new(GenericListArray::::try_new( + Ok(Arc::new(GenericListArray::::try_new( field.clone(), OffsetBuffer::::new(offsets.into()), arrow_array::make_array(data), None, - )?); - - array_concat(&[original_part, default_element_array]) + )?)) } #[cfg(test)] From 591bd1f4a00bf073777aa0c1ae7a1a18098d1cf4 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Wed, 10 Jan 2024 22:35:20 +0800 Subject: [PATCH 14/14] fix error --- datafusion/physical-expr/src/array_expressions.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index f91a2f6bc3bbf..5b35c4b9d8fb2 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -36,8 +36,8 @@ use datafusion_common::cast::{ }; use datafusion_common::utils::{array_into_list_array, list_ndims}; use datafusion_common::{ - exec_datafusion_err, exec_err, internal_err, not_impl_err, plan_err, DataFusionError, - Result, ScalarValue, + exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, + DataFusionError, Result, ScalarValue, }; use itertools::Itertools; @@ -2679,14 +2679,16 @@ where for (row_index, offset_window) in array.offsets().windows(2).enumerate() { let count = count_array.value(row_index).to_usize().ok_or_else(|| { - exec_datafusion_err!("array_resize: failed to convert size to usize") + internal_datafusion_err!("array_resize: failed to convert size to usize") })?; let count = O::usize_as(count); let start = offset_window[0]; if start + count > offset_window[1] { let extra_count = (start + count - offset_window[1]).try_into().map_err(|_| { - exec_datafusion_err!("array_resize: failed to convert size to i64") + internal_datafusion_err!( + "array_resize: failed to convert size to i64" + ) })?; let end = offset_window[1]; mutable.extend(0, (start).to_usize().unwrap(), (end).to_usize().unwrap());