diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index c844bbd5384b4..d81ef1c22a13c 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -111,7 +111,26 @@ impl ExprSchemable for Expr { _ => expr.get_type(schema), }, Expr::Negative(expr) => expr.get_type(schema), - Expr::Column(c) => Ok(schema.data_type(c)?.clone()), + Expr::Column(c) => { + // First try to resolve the column as-is + match schema.data_type(c) { + Ok(data_type) => Ok(data_type.clone()), + Err(e) => { + // If the column has a qualifier but wasn't found, try without the qualifier + // This handles cases where aggregations produce unqualified schemas + // but subsequent operations still reference the qualified names + if c.relation.is_some() { + let unqualified = Column::new_unqualified(&c.name); + match schema.data_type(&unqualified) { + Ok(data_type) => Ok(data_type.clone()), + Err(_) => Err(e), // Return the original error + } + } else { + Err(e) + } + } + } + } Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()), Expr::ScalarVariable(ty, _) => Ok(ty.clone()), Expr::Literal(l, _) => Ok(l.data_type()), @@ -275,7 +294,26 @@ impl ExprSchemable for Expr { || low.nullable(input_schema)? || high.nullable(input_schema)?), - Expr::Column(c) => input_schema.nullable(c), + Expr::Column(c) => { + // First try to resolve the column as-is + match input_schema.nullable(c) { + Ok(nullable) => Ok(nullable), + Err(e) => { + // If the column has a qualifier but wasn't found, try without the qualifier + // This handles cases where aggregations produce unqualified schemas + // but subsequent operations still reference the qualified names + if c.relation.is_some() { + let unqualified = Column::new_unqualified(&c.name); + match input_schema.nullable(&unqualified) { + Ok(nullable) => Ok(nullable), + Err(_) => Err(e), // Return the original error + } + } else { + Err(e) + } + } + } + } Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()), Expr::Literal(value, _) => Ok(value.is_null()), Expr::Case(case) => { @@ -777,9 +815,12 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result {{ @@ -881,6 +922,126 @@ mod tests { ); } + #[test] + fn test_qualified_column_after_aggregation() { + // Test for qualified column reference resolution after aggregation + // This test verifies the fix for the issue where binary expressions + // fail when referencing qualified column names after aggregation + // produces unqualified schemas. + + // Create a schema that simulates the result of an aggregation + // where the output field is unqualified (just "value") + let unqualified_schema = DFSchema::from_unqualified_fields( + vec![Field::new("value", DataType::Float64, false)].into(), + std::collections::HashMap::new(), + ) + .unwrap(); + + // Create a qualified column reference as would be produced + // in a query like: avg(memory_usage_bytes) / 1024 + // where the aggregation produces "value" but the binary expression + // still references the original qualified name + let qualified_col = col("memory_usage_bytes.value"); + + // Before the fix, this would fail with: + // "No field named memory_usage_bytes.value. Valid fields are value." + // After the fix, it should successfully resolve to the unqualified "value" field + let data_type = qualified_col.get_type(&unqualified_schema).unwrap(); + assert_eq!(data_type, DataType::Float64); + + // Test nullable resolution as well + let nullable = qualified_col.nullable(&unqualified_schema).unwrap(); + assert!(!nullable); + + // Test with binary expression + let expr = qualified_col / lit(1024); + let data_type = expr.get_type(&unqualified_schema).unwrap(); + assert_eq!(data_type, DataType::Float64); + } + + #[test] + fn test_qualified_column_fallback_behavior() { + // Test that the fallback only happens for qualified columns and preserves error messages + let unqualified_schema = DFSchema::from_unqualified_fields( + vec![Field::new("existing_col", DataType::Int32, true)].into(), + std::collections::HashMap::new(), + ) + .unwrap(); + + // Test 1: Qualified column that exists unqualified should work + let qualified_existing = col("table.existing_col"); + assert!(qualified_existing.get_type(&unqualified_schema).is_ok()); + assert!(qualified_existing.nullable(&unqualified_schema).is_ok()); + + // Test 2: Qualified column that doesn't exist should return original error + let qualified_nonexistent = col("table.nonexistent_col"); + let error = qualified_nonexistent + .get_type(&unqualified_schema) + .unwrap_err(); + assert!(error.to_string().contains("table.nonexistent_col")); + + // Test 3: Unqualified column that doesn't exist should return original error (no fallback) + let unqualified_nonexistent = col("nonexistent_col"); + let error = unqualified_nonexistent + .get_type(&unqualified_schema) + .unwrap_err(); + assert!(error.to_string().contains("nonexistent_col")); + // Make sure it's not mentioning a qualified table prefix + assert!(!error.to_string().contains("table.nonexistent_col")); + } + + #[test] + fn test_aggregation_scenario() { + // Test a realistic aggregation scenario + use crate::logical_plan::builder::LogicalPlanBuilder; + use crate::logical_plan::builder::LogicalTableSource; + use arrow::datatypes::Schema; + use std::sync::Arc; + + // Create input table schema with qualified columns + let table_schema = Arc::new(Schema::new(vec![ + Field::new("usage_bytes", DataType::Int64, false), + Field::new( + "timestamp", + DataType::Timestamp(arrow::datatypes::TimeUnit::Second, None), + false, + ), + ])); + + // Build a plan that does aggregation + let plan = LogicalPlanBuilder::scan( + "metrics", + Arc::new(LogicalTableSource::new(table_schema)), + None, + ) + .unwrap() + .aggregate( + Vec::::new(), // no group by + vec![avg(col("metrics.usage_bytes"))], // avg with qualified column + ) + .unwrap() + .build() + .unwrap(); + + // Get the output schema from the aggregation + let agg_schema = plan.schema(); + + // The aggregation output should have unqualified column names + // Let's create a qualified reference to test the fallback mechanism + let actual_column_name = agg_schema.field(0).name(); + let qualified_ref = + Column::new(Some(TableReference::bare("metrics")), actual_column_name); + + // This should work due to the fallback mechanism + let result = Expr::Column(qualified_ref).get_type(agg_schema); + assert!( + result.is_ok(), + "Failed to resolve qualified column after aggregation: {:?}", + result.err() + ); + assert_eq!(result.unwrap(), DataType::Float64); + } + #[test] fn test_expr_metadata() { let mut meta = HashMap::new();