Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions datafusion/expr-common/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

//! [`ColumnarValue`] represents the result of evaluating an expression.

use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::array::{Array, ArrayRef, NullArray};
use arrow::compute::{kernels, CastOptions};
use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::format::DEFAULT_CAST_OPTIONS;
Expand Down Expand Up @@ -218,6 +217,17 @@ impl ColumnarValue {
}
}
}

/// Converts an [`ArrayRef`] to a [`ColumnarValue`] based on the supplied arguments.
/// This is useful for scalar UDF implementations to fulfil their contract:
/// if all arguments are scalar values, the result should also be a scalar value.
pub fn from_args_and_result(args: &[Self], result: ArrayRef) -> Result<Self> {
if result.len() == 1 && args.iter().all(|arg| matches!(arg, Self::Scalar(_))) {
Ok(Self::Scalar(ScalarValue::try_from_array(&result, 0)?))
} else {
Ok(Self::Array(result))
Comment on lines +221 to +228
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allow such code to exist, wouldn't it be better to just have it during constant-folding, rather than inside function implementations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already exists in constant folding, but we shouldn't rely on constant folding for the correct function implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That depends how we define "correct".
IF constant folding is the only party where maintaining ColumnarValue::Scalar matters, THEN we can redefine "correct", loosening the contract.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the only place where it matters. If for some reason the expression is not constant folded, you will get an error that the array lengths of different columns don't match.

}
}
}

#[cfg(test)]
Expand Down
16 changes: 8 additions & 8 deletions datafusion/functions/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,8 @@ macro_rules! make_math_unary_udf {
$EVALUATE_BOUNDS(inputs)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;

fn invoke(&self, col_args: &[ColumnarValue]) -> Result<ColumnarValue> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that @simonvandel and @jonahgao have been removing use of make_math_unary_udf in general

(or example #12889)

We should probably make sure we aren't missing a similar problem in their reviews

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add an attribute like maintains_scalar to ScalarUDFImpl and then automatically enforce this process during evaluation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not necessarily true for volatile functions. But I wonder then how will the function know the array length? E.g. for a rand(max) function if I call it like rand(42) which is pretty reasonable, how does it work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a function needs to maintain scalar, we can call ColumnarValue::from_args_and_result after invoking it.

        // in file: datafusion/physical-expr/src/scalar_function.rs
        // evaluate the function
        let output = match self.args.is_empty() {
            true => self.fun.invoke_no_args(batch.num_rows()),
            false => self.fun.invoke(&inputs),
        }?;

        if let ColumnarValue::Array(array) = &output {
            if self.fun.need_maintain_scalar() && !inputs.is_empty() {
                output = ColumnarValue::from_args_and_result(inputs, array)
            }
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that if the function is volatile but it has only scalar arguments, how does it know how many rows to produce?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah anyway, I was just trying to show that this is a deficiency of the UDF model. Ideally, invoke should also get the number of expected rows and then there would also be no need for a special invoke_no_args.

Regarding adding a flag with default false, that doesn't sound very useful. Maybe we can just assume that if the result of any immutable function is an array of size one, we might as well convert it to a scalar value.

Copy link
Member

@jonahgao jonahgao Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, invoke should also get the number of expected rows and then there would also be no need for a special invoke_no_args.

Make sense to me. This makes it easy to support rand_int. It seems that currently all functions should return a scalar value when all their arguments are scalar, for example, math functions that do not use make_math_*_udf.

Maybe we can just assume that if the result of any immutable function is an array of size one, we might as well convert it to a scalar value.

This can be a general solution. The disadvantage is that if the arguments are not real scalar, it will cause unnecessary overhead, as the final result of queries will still need to be converted back into arrays.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joroKr21 @alamb Filed a new issue for other functions #12959

Copy link
Contributor Author

@joroKr21 joroKr21 Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a general solution. The disadvantage is that if the arguments are not real scalar, it will cause unnecessary overhead, as the final result of queries will still need to be converted back into arrays.

Yeah, but it sounds very unlikely that we will have a batch of one row. In any case, we can do the same check that all arguments are scalar 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but it sounds very unlikely that we will have a batch of one row.

it's unlikely, but you must not assume that.

a trivial example is a source table with one row.

we can do the same check that all arguments are scalar 👍

👍

let args = ColumnarValue::values_to_arrays(col_args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => {
Arc::new(make_function_scalar_inputs_return_type!(
Expand All @@ -255,7 +254,8 @@ macro_rules! make_math_unary_udf {
)
}
};
Ok(ColumnarValue::Array(arr))

ColumnarValue::from_args_and_result(col_args, arr)
}
}
}
Expand Down Expand Up @@ -336,9 +336,8 @@ macro_rules! make_math_binary_udf {
$OUTPUT_ORDERING(input)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;

fn invoke(&self, col_args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(col_args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => Arc::new(make_function_inputs2!(
&args[0],
Expand All @@ -364,7 +363,8 @@ macro_rules! make_math_binary_udf {
)
}
};
Ok(ColumnarValue::Array(arr))

ColumnarValue::from_args_and_result(col_args, arr)
}
}
}
Expand Down