From ebf32b066e2de1f79abab4f6c7acdc0bc70c094b Mon Sep 17 00:00:00 2001 From: Alex Araujo Date: Wed, 17 Sep 2025 16:43:40 -0500 Subject: [PATCH] fix: resolve qualified column references after aggregation Adds fallback logic to handle qualified column references when they fail to resolve directly in the schema. This commonly occurs when aggregations produce unqualified schemas but subsequent operations still reference qualified column names. The fix preserves original error messages and only applies the fallback for qualified columns that fail initial resolution. --- datafusion/expr/src/expr_schema.rs | 167 ++++++++++++++++++++++++++++- 1 file changed, 164 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index c844bbd5384b4..d81ef1c22a13c 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -111,7 +111,26 @@ impl ExprSchemable for Expr { _ => expr.get_type(schema), }, Expr::Negative(expr) => expr.get_type(schema), - Expr::Column(c) => Ok(schema.data_type(c)?.clone()), + Expr::Column(c) => { + // First try to resolve the column as-is + match schema.data_type(c) { + Ok(data_type) => Ok(data_type.clone()), + Err(e) => { + // If the column has a qualifier but wasn't found, try without the qualifier + // This handles cases where aggregations produce unqualified schemas + // but subsequent operations still reference the qualified names + if c.relation.is_some() { + let unqualified = Column::new_unqualified(&c.name); + match schema.data_type(&unqualified) { + Ok(data_type) => Ok(data_type.clone()), + Err(_) => Err(e), // Return the original error + } + } else { + Err(e) + } + } + } + } Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()), Expr::ScalarVariable(ty, _) => Ok(ty.clone()), Expr::Literal(l, _) => Ok(l.data_type()), @@ -275,7 +294,26 @@ impl ExprSchemable for Expr { || low.nullable(input_schema)? || high.nullable(input_schema)?), - Expr::Column(c) => input_schema.nullable(c), + Expr::Column(c) => { + // First try to resolve the column as-is + match input_schema.nullable(c) { + Ok(nullable) => Ok(nullable), + Err(e) => { + // If the column has a qualifier but wasn't found, try without the qualifier + // This handles cases where aggregations produce unqualified schemas + // but subsequent operations still reference the qualified names + if c.relation.is_some() { + let unqualified = Column::new_unqualified(&c.name); + match input_schema.nullable(&unqualified) { + Ok(nullable) => Ok(nullable), + Err(_) => Err(e), // Return the original error + } + } else { + Err(e) + } + } + } + } Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()), Expr::Literal(value, _) => Ok(value.is_null()), Expr::Case(case) => { @@ -777,9 +815,12 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result {{ @@ -881,6 +922,126 @@ mod tests { ); } + #[test] + fn test_qualified_column_after_aggregation() { + // Test for qualified column reference resolution after aggregation + // This test verifies the fix for the issue where binary expressions + // fail when referencing qualified column names after aggregation + // produces unqualified schemas. + + // Create a schema that simulates the result of an aggregation + // where the output field is unqualified (just "value") + let unqualified_schema = DFSchema::from_unqualified_fields( + vec![Field::new("value", DataType::Float64, false)].into(), + std::collections::HashMap::new(), + ) + .unwrap(); + + // Create a qualified column reference as would be produced + // in a query like: avg(memory_usage_bytes) / 1024 + // where the aggregation produces "value" but the binary expression + // still references the original qualified name + let qualified_col = col("memory_usage_bytes.value"); + + // Before the fix, this would fail with: + // "No field named memory_usage_bytes.value. Valid fields are value." + // After the fix, it should successfully resolve to the unqualified "value" field + let data_type = qualified_col.get_type(&unqualified_schema).unwrap(); + assert_eq!(data_type, DataType::Float64); + + // Test nullable resolution as well + let nullable = qualified_col.nullable(&unqualified_schema).unwrap(); + assert!(!nullable); + + // Test with binary expression + let expr = qualified_col / lit(1024); + let data_type = expr.get_type(&unqualified_schema).unwrap(); + assert_eq!(data_type, DataType::Float64); + } + + #[test] + fn test_qualified_column_fallback_behavior() { + // Test that the fallback only happens for qualified columns and preserves error messages + let unqualified_schema = DFSchema::from_unqualified_fields( + vec![Field::new("existing_col", DataType::Int32, true)].into(), + std::collections::HashMap::new(), + ) + .unwrap(); + + // Test 1: Qualified column that exists unqualified should work + let qualified_existing = col("table.existing_col"); + assert!(qualified_existing.get_type(&unqualified_schema).is_ok()); + assert!(qualified_existing.nullable(&unqualified_schema).is_ok()); + + // Test 2: Qualified column that doesn't exist should return original error + let qualified_nonexistent = col("table.nonexistent_col"); + let error = qualified_nonexistent + .get_type(&unqualified_schema) + .unwrap_err(); + assert!(error.to_string().contains("table.nonexistent_col")); + + // Test 3: Unqualified column that doesn't exist should return original error (no fallback) + let unqualified_nonexistent = col("nonexistent_col"); + let error = unqualified_nonexistent + .get_type(&unqualified_schema) + .unwrap_err(); + assert!(error.to_string().contains("nonexistent_col")); + // Make sure it's not mentioning a qualified table prefix + assert!(!error.to_string().contains("table.nonexistent_col")); + } + + #[test] + fn test_aggregation_scenario() { + // Test a realistic aggregation scenario + use crate::logical_plan::builder::LogicalPlanBuilder; + use crate::logical_plan::builder::LogicalTableSource; + use arrow::datatypes::Schema; + use std::sync::Arc; + + // Create input table schema with qualified columns + let table_schema = Arc::new(Schema::new(vec![ + Field::new("usage_bytes", DataType::Int64, false), + Field::new( + "timestamp", + DataType::Timestamp(arrow::datatypes::TimeUnit::Second, None), + false, + ), + ])); + + // Build a plan that does aggregation + let plan = LogicalPlanBuilder::scan( + "metrics", + Arc::new(LogicalTableSource::new(table_schema)), + None, + ) + .unwrap() + .aggregate( + Vec::::new(), // no group by + vec![avg(col("metrics.usage_bytes"))], // avg with qualified column + ) + .unwrap() + .build() + .unwrap(); + + // Get the output schema from the aggregation + let agg_schema = plan.schema(); + + // The aggregation output should have unqualified column names + // Let's create a qualified reference to test the fallback mechanism + let actual_column_name = agg_schema.field(0).name(); + let qualified_ref = + Column::new(Some(TableReference::bare("metrics")), actual_column_name); + + // This should work due to the fallback mechanism + let result = Expr::Column(qualified_ref).get_type(agg_schema); + assert!( + result.is_ok(), + "Failed to resolve qualified column after aggregation: {:?}", + result.err() + ); + assert_eq!(result.unwrap(), DataType::Float64); + } + #[test] fn test_expr_metadata() { let mut meta = HashMap::new();