Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion_common::cast::as_boolean_array;
use datafusion_common::utils::{compare_rows, extract_row_at_idx_to_buf, get_row_at_idx};
use datafusion_common::{
DataFusionError, Result, ScalarValue, arrow_datafusion_err, internal_err,
not_impl_err,
};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::{AggregateOrderSensitivity, format_state_name};
Expand Down Expand Up @@ -133,8 +134,20 @@ impl AggregateUDFImpl for FirstValue {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
not_impl_err!("Not called because the return_field_from_args is implemented")
}

fn return_field(&self, arg_fields: &[FieldRef]) -> Result<FieldRef> {
// Preserve metadata from the first argument field
Ok(Arc::new(
Field::new(
self.name(),
arg_fields[0].data_type().clone(),
true, // always nullable, there may be no rows
)
.with_metadata(arg_fields[0].metadata().clone()),
))
}

fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
Expand Down Expand Up @@ -1071,8 +1084,20 @@ impl AggregateUDFImpl for LastValue {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
not_impl_err!("Not called because the return_field_from_args is implemented")
}

fn return_field(&self, arg_fields: &[FieldRef]) -> Result<FieldRef> {
// Preserve metadata from the first argument field
Ok(Arc::new(
Field::new(
self.name(),
arg_fields[0].data_type().clone(),
true, // always nullable, there may be no rows
)
.with_metadata(arg_fields[0].metadata().clone()),
))
}

fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
Expand Down
54 changes: 53 additions & 1 deletion datafusion/sqllogictest/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::catalog::{
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, Session,
};
use datafusion::common::{not_impl_err, DataFusionError, Result};
use datafusion::common::{exec_err, not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion::functions::math::abs;
use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl};
use datafusion::logical_expr::{
Expand Down Expand Up @@ -398,6 +398,58 @@ pub async fn register_metadata_tables(ctx: &SessionContext) {
.unwrap();

ctx.register_batch("table_with_metadata", batch).unwrap();

// Register the get_metadata UDF for testing metadata preservation
ctx.register_udf(ScalarUDF::from(GetMetadataUdf::new()));
}

/// UDF to extract metadata from a field for testing purposes
/// Usage: get_metadata(expr, 'key') -> returns the metadata value or NULL
#[derive(Debug, PartialEq, Eq, Hash)]
struct GetMetadataUdf {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is useful enough to introduce as a function to datafusion itself, instead of being only in SLT? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to do so if you think it's worth it - I went with the conservative approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to do it in this PR, but worth filing a ticket for as followup

signature: Signature,
}

impl GetMetadataUdf {
fn new() -> Self {
Self {
signature: Signature::any(2, Volatility::Immutable),
}
}
}

impl ScalarUDFImpl for GetMetadataUdf {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"get_metadata"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
// Get the metadata key from the second argument (must be a string literal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would also be nice if we supported a single column version that returned the metadata as a struct array too

let key = match &args.args[1] {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(k))) => k.clone(),
_ => {
return exec_err!("get_metadata second argument must be a string literal")
}
};

// Get metadata from the first argument's field
let metadata_value = args.arg_fields[0].metadata().get(&key).cloned();

// Return as a scalar (same value for all rows)
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(metadata_value)))
}
}

/// Create a UDF function named "example". See the `sample_udf.rs` example
Expand Down
49 changes: 49 additions & 0 deletions datafusion/sqllogictest/test_files/metadata.slt
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,56 @@ order by 1 asc nulls last;
3 1
NULL 1

# Regression test: first_value should preserve metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed for the existing regression tests in this file, they don't actually check metadata 🤔

# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687
query I
select count(distinct name) from table_with_metadata;
----
2

With this new UDF we can look into updating those tests to be more similar to the ones introduced here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will file a ticket before we merge this to do so

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the tests I think test that there is metadata on the input tables (rather than the output tables)

I do really like the idea of adding a UDF, simlarly to arrow_typeof that can show the metadata

``sql

select arrow_typeof('foo');
+---------------------------+
| arrow_typeof(Utf8("foo")) |
+---------------------------+
| Utf8 |
+---------------------------+
1 row(s) fetched.
Elapsed 0.024 seconds.


Possibilities: add a new argument

```sql
> select arrow_typeof('foo', true);

Possibility: add a new function

> select arrow_metadata('foo');

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would help debug various metadata issues more easily

I can file a ticket if you think this is reasonable

query IT
select first_value(id order by id asc nulls last), get_metadata(first_value(id order by id asc nulls last), 'metadata_key')
from table_with_metadata;
----
1 the id field

# Regression test: last_value should preserve metadata
query IT
select last_value(id order by id asc nulls first), get_metadata(last_value(id order by id asc nulls first), 'metadata_key')
from table_with_metadata;
----
3 the id field

# Regression test: DISTINCT ON should preserve metadata (uses first_value internally)
query ITTT
select distinct on (id) id, get_metadata(id, 'metadata_key'), name, get_metadata(name, 'metadata_key')
from table_with_metadata order by id asc nulls last;
----
1 the id field NULL the name field
3 the id field baz the name field
NULL the id field bar the name field

# Regression test: DISTINCT should preserve metadata
query ITTT
with res AS (
select distinct id, name from table_with_metadata
)
select id, get_metadata(id, 'metadata_key'), name, get_metadata(name, 'metadata_key')
from res
order by id asc nulls last;
----
1 the id field NULL the name field
3 the id field baz the name field
NULL the id field bar the name field

# Regression test: grouped columns should preserve metadata
query ITTT
with res AS (
select name, count(*), id
from table_with_metadata
group by id, name
)
select id, get_metadata(id, 'metadata_key'), name, get_metadata(name, 'metadata_key')
from res
order by id asc nulls last, name asc nulls last
----
1 the id field NULL the name field
3 the id field baz the name field
NULL the id field bar the name field
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if a non-existing column is passed as a first argument of the get_metadata() udf ? Or a scalar value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return an empty map?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or NULL.
I mean it would be good to have some negative test cases too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if a non-existing column is passed as a first argument of the get_metadata() udf

I think it should probably error like any other query that tries to access a undefined column


statement ok
drop table table_with_metadata;