Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,13 +971,8 @@ async fn plan_and_collect(ctx: &SessionContext, sql: &str) -> Result<Vec<RecordB
async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch> {
let df = ctx.sql(sql).await.unwrap();

// We are not really interested in the direct output of optimized_logical_plan
// since the physical plan construction already optimizes the given logical plan
// and we want to avoid double-optimization as a consequence. So we just construct
// it here to make sure that it doesn't fail at this step and get the optimized
// schema (to assert later that the logical and optimized schemas are the same).
let optimized = df.clone().into_optimized_plan().unwrap();
assert_eq!(df.logical_plan().schema(), optimized.schema());
Comment on lines -979 to -980
Copy link
Member Author

@jackwener jackwener Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't check schema here.
Because analyzer will change schema

optimizer already contain schema checker, so we don't need assert_eq

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this check predated the existence of the analyzer

// optimize just for check schema don't change during optimization.
df.clone().into_optimized_plan().unwrap();

df.collect().await.unwrap()
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/sqllogictests/test_files/scalar.slt
Original file line number Diff line number Diff line change
Expand Up @@ -646,27 +646,27 @@ SELECT CASE WHEN NULL THEN 'foo' ELSE 'bar' END
bar

# case_expr_with_null()
query I
query ?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange. I can't understand it.

I exec sql to confirm type coercion is right

 select arrow_typeof(case when b is null then null else b end) from (select a,b from (values (1,null),(2,3)) as t (a,b)) a;

+------------------------------------------------------------+
| arrow_typeof(CASE WHEN a.b IS NULL THEN NULL ELSE a.b END) |
+------------------------------------------------------------+
| Int64                                                      |
| Int64                                                      |
+------------------------------------------------------------+

select case when b is null then null else b end from (select a,b from (values (1,null),(2,3)) as t (a,b)) a;
----
NULL
3

query I
query ?
select case when b is null then null else b end from (select a,b from (values (1,1),(2,3)) as t (a,b)) a;
----
1
3

# case_expr_with_nulls()
query I
query ?
select case when b is null then null when b < 3 then null when b >=3 then b + 1 else b end from (select a,b from (values (1,null),(1,2),(2,3)) as t (a,b)) a
----
NULL
NULL
4

query I
query ?
select case b when 1 then null when 2 then null when 3 then b + 1 else b end from (select a,b from (values (1,null),(1,2),(2,3)) as t (a,b)) a;
----
NULL
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sqllogictests/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,13 @@ logical_plan
Sort: t1.c1 ASC NULLS LAST
--Union
----TableScan: t1 projection=[c1]
----Projection: t2.c1a AS t1.c1
----Projection: t2.c1a AS c1
------TableScan: t2 projection=[c1a]
physical_plan
SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
--UnionExec
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
----ProjectionExec: expr=[c1a@0 as t1.c1]
----ProjectionExec: expr=[c1a@0 as c1]
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a], output_ordering=[c1a@0 ASC NULLS LAST], has_header=true

statement ok
Expand Down
22 changes: 1 addition & 21 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::expr::{
};
use crate::field_util::get_indexed_field;
use crate::type_coercion::binary::get_result_type;
use crate::type_coercion::other::get_coerce_type_for_case_expression;
use crate::{
aggregate_function, function, window_function, LogicalPlan, Projection, Subquery,
};
Expand Down Expand Up @@ -73,26 +72,7 @@ impl ExprSchemable for Expr {
Expr::OuterReferenceColumn(ty, _) => Ok(ty.clone()),
Expr::ScalarVariable(ty, _) => Ok(ty.clone()),
Expr::Literal(l) => Ok(l.get_datatype()),
Expr::Case(case) => {
// https://github.com/apache/arrow-datafusion/issues/5821
// when #5681 will be fixed, this code can be reverted to:
// case.when_then_expr[0].1.get_type(schema)
let then_types = case
.when_then_expr
.iter()
.map(|when_then| when_then.1.get_type(schema))
.collect::<Result<Vec<_>>>()?;
let else_type = match &case.else_expr {
None => Ok(None),
Some(expr) => expr.get_type(schema).map(Some),
}?;
get_coerce_type_for_case_expression(&then_types, else_type.as_ref())
.ok_or_else(|| {
DataFusionError::Internal(String::from(
"Cannot infer type for CASE statement",
))
})
}
Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
Expr::ScalarUDF(ScalarUDF { fun, args }) => {
Expand Down
11 changes: 9 additions & 2 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion_expr::utils::from_plan;
use datafusion_expr::{
aggregate_function, function, is_false, is_not_false, is_not_true, is_not_unknown,
is_true, is_unknown, type_coercion, AggregateFunction, Expr, LogicalPlan, Operator,
WindowFrame, WindowFrameBound, WindowFrameUnits,
Projection, WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use datafusion_expr::{ExprSchemable, Signature};

Expand Down Expand Up @@ -108,7 +108,14 @@ fn analyze_internal(
})
.collect::<Result<Vec<_>>>()?;

from_plan(plan, &new_expr, &new_inputs)
// TODO: use from_plan after fix https://github.com/apache/arrow-datafusion/issues/6613
match &plan {
LogicalPlan::Projection(_) => Ok(LogicalPlan::Projection(Projection::try_new(
new_expr,
Arc::new(new_inputs[0].clone()),
)?)),
_ => from_plan(plan, &new_expr, &new_inputs),
}
}

pub(crate) struct TypeCoercionRewriter {
Expand Down