Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 164 additions & 3 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,26 @@ impl ExprSchemable for Expr {
_ => expr.get_type(schema),
},
Expr::Negative(expr) => expr.get_type(schema),
Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
Expr::Column(c) => {
// First try to resolve the column as-is
match schema.data_type(c) {
Ok(data_type) => Ok(data_type.clone()),
Err(e) => {
// If the column has a qualifier but wasn't found, try without the qualifier
// This handles cases where aggregations produce unqualified schemas
// but subsequent operations still reference the qualified names
if c.relation.is_some() {
let unqualified = Column::new_unqualified(&c.name);
match schema.data_type(&unqualified) {
Ok(data_type) => Ok(data_type.clone()),
Err(_) => Err(e), // Return the original error
}
} else {
Err(e)
}
}
}
}
Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()),
Expr::ScalarVariable(ty, _) => Ok(ty.clone()),
Expr::Literal(l, _) => Ok(l.data_type()),
Expand Down Expand Up @@ -275,7 +294,26 @@ impl ExprSchemable for Expr {
|| low.nullable(input_schema)?
|| high.nullable(input_schema)?),

Expr::Column(c) => input_schema.nullable(c),
Expr::Column(c) => {
// First try to resolve the column as-is
match input_schema.nullable(c) {
Ok(nullable) => Ok(nullable),
Err(e) => {
// If the column has a qualifier but wasn't found, try without the qualifier
// This handles cases where aggregations produce unqualified schemas
// but subsequent operations still reference the qualified names
if c.relation.is_some() {
let unqualified = Column::new_unqualified(&c.name);
match input_schema.nullable(&unqualified) {
Ok(nullable) => Ok(nullable),
Err(_) => Err(e), // Return the original error
}
} else {
Err(e)
}
}
}
}
Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
Expr::Literal(value, _) => Ok(value.is_null()),
Expr::Case(case) => {
Expand Down Expand Up @@ -777,9 +815,12 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subq
#[cfg(test)]
mod tests {
use super::*;
use crate::test::function_stub::avg;
use crate::{col, lit, out_ref_col_with_metadata};

use datafusion_common::{internal_err, DFSchema, HashMap, ScalarValue};
use datafusion_common::{
internal_err, Column, DFSchema, HashMap, ScalarValue, TableReference,
};

macro_rules! test_is_expr_nullable {
($EXPR_TYPE:ident) => {{
Expand Down Expand Up @@ -881,6 +922,126 @@ mod tests {
);
}

#[test]
fn test_qualified_column_after_aggregation() {
// Test for qualified column reference resolution after aggregation
// This test verifies the fix for the issue where binary expressions
// fail when referencing qualified column names after aggregation
// produces unqualified schemas.

// Create a schema that simulates the result of an aggregation
// where the output field is unqualified (just "value")
let unqualified_schema = DFSchema::from_unqualified_fields(
vec![Field::new("value", DataType::Float64, false)].into(),
std::collections::HashMap::new(),
)
.unwrap();

// Create a qualified column reference as would be produced
// in a query like: avg(memory_usage_bytes) / 1024
// where the aggregation produces "value" but the binary expression
// still references the original qualified name
let qualified_col = col("memory_usage_bytes.value");

// Before the fix, this would fail with:
// "No field named memory_usage_bytes.value. Valid fields are value."
// After the fix, it should successfully resolve to the unqualified "value" field
let data_type = qualified_col.get_type(&unqualified_schema).unwrap();
assert_eq!(data_type, DataType::Float64);

// Test nullable resolution as well
let nullable = qualified_col.nullable(&unqualified_schema).unwrap();
assert!(!nullable);

// Test with binary expression
let expr = qualified_col / lit(1024);
let data_type = expr.get_type(&unqualified_schema).unwrap();
assert_eq!(data_type, DataType::Float64);
}

#[test]
fn test_qualified_column_fallback_behavior() {
// Test that the fallback only happens for qualified columns and preserves error messages
let unqualified_schema = DFSchema::from_unqualified_fields(
vec![Field::new("existing_col", DataType::Int32, true)].into(),
std::collections::HashMap::new(),
)
.unwrap();

// Test 1: Qualified column that exists unqualified should work
let qualified_existing = col("table.existing_col");
assert!(qualified_existing.get_type(&unqualified_schema).is_ok());
assert!(qualified_existing.nullable(&unqualified_schema).is_ok());

// Test 2: Qualified column that doesn't exist should return original error
let qualified_nonexistent = col("table.nonexistent_col");
let error = qualified_nonexistent
.get_type(&unqualified_schema)
.unwrap_err();
assert!(error.to_string().contains("table.nonexistent_col"));

// Test 3: Unqualified column that doesn't exist should return original error (no fallback)
let unqualified_nonexistent = col("nonexistent_col");
let error = unqualified_nonexistent
.get_type(&unqualified_schema)
.unwrap_err();
assert!(error.to_string().contains("nonexistent_col"));
// Make sure it's not mentioning a qualified table prefix
assert!(!error.to_string().contains("table.nonexistent_col"));
}

#[test]
fn test_aggregation_scenario() {
// Test a realistic aggregation scenario
use crate::logical_plan::builder::LogicalPlanBuilder;
use crate::logical_plan::builder::LogicalTableSource;
use arrow::datatypes::Schema;
use std::sync::Arc;

// Create input table schema with qualified columns
let table_schema = Arc::new(Schema::new(vec![
Field::new("usage_bytes", DataType::Int64, false),
Field::new(
"timestamp",
DataType::Timestamp(arrow::datatypes::TimeUnit::Second, None),
false,
),
]));

// Build a plan that does aggregation
let plan = LogicalPlanBuilder::scan(
"metrics",
Arc::new(LogicalTableSource::new(table_schema)),
None,
)
.unwrap()
.aggregate(
Vec::<Expr>::new(), // no group by
vec![avg(col("metrics.usage_bytes"))], // avg with qualified column
)
.unwrap()
.build()
.unwrap();

// Get the output schema from the aggregation
let agg_schema = plan.schema();

// The aggregation output should have unqualified column names
// Let's create a qualified reference to test the fallback mechanism
let actual_column_name = agg_schema.field(0).name();
let qualified_ref =
Column::new(Some(TableReference::bare("metrics")), actual_column_name);

// This should work due to the fallback mechanism
let result = Expr::Column(qualified_ref).get_type(agg_schema);
assert!(
result.is_ok(),
"Failed to resolve qualified column after aggregation: {:?}",
result.err()
);
assert_eq!(result.unwrap(), DataType::Float64);
}

#[test]
fn test_expr_metadata() {
let mut meta = HashMap::new();
Expand Down