Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,13 @@ fn build_project_plan(
}

for field in input.schema().fields() {
if !fields_set.contains(field.name()) {
fields_set.insert(field.name().to_owned());
if fields_set.insert(field.qualified_name()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice simplification

Use qualified name instead of unqualified name in the HashSet that tracks which fields have already been added to the projection

This is correct 👍 It used to be inconsistent with L287 which uses qualified name.

fields.push(field.clone());
project_exprs.push(Expr::Column(field.qualified_column()));
}
}

let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?;
schema.merge(input.schema());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge is done in the previous for loop so I think this is redundant. Let's remove it.

let schema = DFSchema::new_with_metadata(fields, HashMap::new())?;

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
project_exprs,
Expand Down Expand Up @@ -699,6 +697,7 @@ fn replace_common_expr(
mod test {
use super::*;
use crate::test::*;
use datafusion_expr::logical_plan::JoinType;
use datafusion_expr::{
avg, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, sum,
Operator,
Expand Down Expand Up @@ -890,4 +889,30 @@ mod test {
assert!(field_set.insert(field.qualified_name()));
}
}

#[test]
fn redundant_project_fields_join_input() {
let table_scan_1 = test_table_scan_with_name("test1").unwrap();
let table_scan_2 = test_table_scan_with_name("test2").unwrap();
let join = LogicalPlanBuilder::from(table_scan_1)
.join(&table_scan_2, JoinType::Inner, (vec!["a"], vec!["a"]), None)
.unwrap()
.build()
.unwrap();
let affected_id: HashSet<Identifier> =
["c+a".to_string(), "d+a".to_string()].into_iter().collect();
let expr_set = [
("c+a".to_string(), (col("c+a"), 1, DataType::UInt32)),
("d+a".to_string(), (col("d+a"), 1, DataType::UInt32)),
]
.into_iter()
.collect();
let project = build_project_plan(join, affected_id.clone(), &expr_set).unwrap();
let project_2 = build_project_plan(project, affected_id, &expr_set).unwrap();

let mut field_set = HashSet::new();
for field in project_2.schema().fields() {
assert!(field_set.insert(field.qualified_name()));
}
}
}