From 2d5098ddd59e794f91e7a31d4542c1a2b6939cf8 Mon Sep 17 00:00:00 2001 From: James Xia Date: Thu, 16 Jan 2025 09:12:41 -0800 Subject: [PATCH 01/11] Propagate filter info from TableScan to ReadRel Propagate information in datafusion::logical_expr::TableScan.filters to substrait::proto::ReadRel.best_effort_filter. --- .../substrait/src/logical_plan/producer.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 9dbb246453be5..035fcb260f365 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -540,7 +540,7 @@ pub fn to_substrait_rel( } pub fn from_table_scan( - _producer: &mut impl SubstraitProducer, + producer: &mut impl SubstraitProducer, scan: &TableScan, ) -> Result> { let projection = scan.projection.as_ref().map(|p| { @@ -560,12 +560,24 @@ pub fn from_table_scan( let table_schema = scan.source.schema().to_dfschema_ref()?; let base_schema = to_substrait_named_struct(&table_schema)?; + let best_effort_filter_option = if scan.filters.len() > 0 { + let table_schema_qualified = Arc::new(DFSchema::try_from_qualified_schema(scan.table_name.clone(), &(scan.source.schema())).unwrap()); + let mut combined_expr = scan.filters[0].clone(); + for i in 1..scan.filters.len() { + combined_expr = combined_expr.and(scan.filters[i].clone()); + } + let best_effort_filter_expr = producer.handle_expr(&combined_expr, &table_schema_qualified)?; + Some(Box::new(best_effort_filter_expr)) + } else { + None + }; + Ok(Box::new(Rel { rel_type: Some(RelType::Read(Box::new(ReadRel { common: None, base_schema: Some(base_schema), filter: None, - best_effort_filter: None, + best_effort_filter: best_effort_filter_option, projection, advanced_extension: None, read_type: Some(ReadType::NamedTable(NamedTable { From 3f8f262cda9c1a6b86fd815f5ab26499f6990f56 Mon Sep 17 00:00:00 2001 From: James Xia Date: Sat, 18 Jan 2025 22:00:26 -0800 Subject: [PATCH 02/11] Add test --- .../tests/cases/roundtrip_logical_plan.rs | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index e6b8bdbc047e3..1deb61becccc5 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -1234,6 +1234,15 @@ async fn roundtrip_repartition_hash() -> Result<()> { Ok(()) } +#[tokio::test] +async fn roundtrip_read_best_effort_filter() -> Result<()> { + roundtrip_verify_read_best_effort_filter_count( + "SELECT data.a FROM data JOIN data2 ON data.a = data2.a where data.a < 5 AND data2.e > 1", + 2, + ) + .await +} + fn check_post_join_filters(rel: &Rel) -> Result<()> { // search for target_rel and field value in proto match &rel.rel_type { @@ -1319,6 +1328,93 @@ async fn verify_post_join_filter_value(proto: Box) -> Result<()> { Ok(()) } +fn count_read_best_effort_filters(rel: &Rel, count: &mut u32) -> Result<()> { + // search for target_rel and field value in proto + match &rel.rel_type { + Some(RelType::Read(read)) => { + // increment counter for best-effort filter if not None + if read.best_effort_filter.is_some() { + *count += 1; + } + Ok(()) + } + Some(RelType::Join(join)) => { + match count_read_best_effort_filters(join.left.as_ref().unwrap().as_ref(), count) { + Err(e) => Err(e), + Ok(_) => { + count_read_best_effort_filters(join.right.as_ref().unwrap().as_ref(), count) + } + } + } + Some(RelType::Project(p)) => { + count_read_best_effort_filters(p.input.as_ref().unwrap().as_ref(), count) + } + Some(RelType::Filter(filter)) => { + count_read_best_effort_filters(filter.input.as_ref().unwrap().as_ref(), count) + } + Some(RelType::Fetch(fetch)) => { + count_read_best_effort_filters(fetch.input.as_ref().unwrap().as_ref(), count) + } + Some(RelType::Sort(sort)) => { + count_read_best_effort_filters(sort.input.as_ref().unwrap().as_ref(), count) + } + Some(RelType::Aggregate(agg)) => { + count_read_best_effort_filters(agg.input.as_ref().unwrap().as_ref(), count) + } + Some(RelType::Set(set)) => { + for input in &set.inputs { + match count_read_best_effort_filters(input, count) { + Err(e) => return Err(e), + Ok(_) => continue, + } + } + Ok(()) + } + Some(RelType::ExtensionSingle(ext)) => { + count_read_best_effort_filters(ext.input.as_ref().unwrap().as_ref(), count) + } + Some(RelType::ExtensionMulti(ext)) => { + for input in &ext.inputs { + match count_read_best_effort_filters(input, count) { + Err(e) => return Err(e), + Ok(_) => continue, + } + } + Ok(()) + } + Some(RelType::ExtensionLeaf(_)) => Ok(()), + _ => not_impl_err!( + "Unexpected RelType: {:?} in read best-effort filter check", + rel.rel_type + ), + } +} + +async fn assert_read_best_effort_filter_count(proto: Box, expected_count: u32) -> Result<()> { + let mut count : u32 = 0; + for relation in &proto.relations { + match relation.rel_type.as_ref() { + Some(rt) => match rt { + plan_rel::RelType::Rel(rel) => match count_read_best_effort_filters(rel, &mut count) { + Err(e) => return Err(e), + Ok(_) => continue, + }, + plan_rel::RelType::Root(root) => { + match count_read_best_effort_filters(root.input.as_ref().unwrap(), &mut count) { + Err(e) => return Err(e), + Ok(_) => continue, + } + } + }, + None => return plan_err!("Cannot parse plan relation: None"), + } + } + + assert_eq!(expected_count, count); + + Ok(()) +} + async fn assert_expected_plan_unoptimized( sql: &str, expected_plan_str: &str, @@ -1489,6 +1585,14 @@ async fn roundtrip_verify_post_join_filter(sql: &str) -> Result<()> { verify_post_join_filter_value(proto).await } +async fn roundtrip_verify_read_best_effort_filter_count(sql: &str, expected_count: u32) -> Result<()> { + let ctx = create_context().await?; + let proto = roundtrip_with_ctx(sql, ctx).await?; + + // verify that the count of best-effort filters in read relations is as expected + assert_read_best_effort_filter_count(proto, expected_count).await +} + async fn roundtrip_all_types(sql: &str) -> Result<()> { roundtrip_with_ctx(sql, create_all_type_context().await?).await?; Ok(()) From 203b2da4929aae044a156215ea80dd4c597ebbac Mon Sep 17 00:00:00 2001 From: James Xia Date: Sun, 19 Jan 2025 20:20:00 -0800 Subject: [PATCH 03/11] cargo fmt --- .../substrait/src/logical_plan/producer.rs | 11 +++++- .../tests/cases/roundtrip_logical_plan.rs | 39 +++++++++++++------ 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 035fcb260f365..9752484c0c626 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -561,12 +561,19 @@ pub fn from_table_scan( let base_schema = to_substrait_named_struct(&table_schema)?; let best_effort_filter_option = if scan.filters.len() > 0 { - let table_schema_qualified = Arc::new(DFSchema::try_from_qualified_schema(scan.table_name.clone(), &(scan.source.schema())).unwrap()); + let table_schema_qualified = Arc::new( + DFSchema::try_from_qualified_schema( + scan.table_name.clone(), + &(scan.source.schema()), + ) + .unwrap(), + ); let mut combined_expr = scan.filters[0].clone(); for i in 1..scan.filters.len() { combined_expr = combined_expr.and(scan.filters[i].clone()); } - let best_effort_filter_expr = producer.handle_expr(&combined_expr, &table_schema_qualified)?; + let best_effort_filter_expr = + producer.handle_expr(&combined_expr, &table_schema_qualified)?; Some(Box::new(best_effort_filter_expr)) } else { None diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 1deb61becccc5..9e725c8e72e93 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -1339,11 +1339,15 @@ fn count_read_best_effort_filters(rel: &Rel, count: &mut u32) -> Result<()> { Ok(()) } Some(RelType::Join(join)) => { - match count_read_best_effort_filters(join.left.as_ref().unwrap().as_ref(), count) { + match count_read_best_effort_filters( + join.left.as_ref().unwrap().as_ref(), + count, + ) { Err(e) => Err(e), - Ok(_) => { - count_read_best_effort_filters(join.right.as_ref().unwrap().as_ref(), count) - } + Ok(_) => count_read_best_effort_filters( + join.right.as_ref().unwrap().as_ref(), + count, + ), } } Some(RelType::Project(p)) => { @@ -1390,17 +1394,25 @@ fn count_read_best_effort_filters(rel: &Rel, count: &mut u32) -> Result<()> { } } -async fn assert_read_best_effort_filter_count(proto: Box, expected_count: u32) -> Result<()> { - let mut count : u32 = 0; +async fn assert_read_best_effort_filter_count( + proto: Box, + expected_count: u32, +) -> Result<()> { + let mut count: u32 = 0; for relation in &proto.relations { match relation.rel_type.as_ref() { Some(rt) => match rt { - plan_rel::RelType::Rel(rel) => match count_read_best_effort_filters(rel, &mut count) { - Err(e) => return Err(e), - Ok(_) => continue, - }, + plan_rel::RelType::Rel(rel) => { + match count_read_best_effort_filters(rel, &mut count) { + Err(e) => return Err(e), + Ok(_) => continue, + } + } plan_rel::RelType::Root(root) => { - match count_read_best_effort_filters(root.input.as_ref().unwrap(), &mut count) { + match count_read_best_effort_filters( + root.input.as_ref().unwrap(), + &mut count, + ) { Err(e) => return Err(e), Ok(_) => continue, } @@ -1585,7 +1597,10 @@ async fn roundtrip_verify_post_join_filter(sql: &str) -> Result<()> { verify_post_join_filter_value(proto).await } -async fn roundtrip_verify_read_best_effort_filter_count(sql: &str, expected_count: u32) -> Result<()> { +async fn roundtrip_verify_read_best_effort_filter_count( + sql: &str, + expected_count: u32, +) -> Result<()> { let ctx = create_context().await?; let proto = roundtrip_with_ctx(sql, ctx).await?; From 204d3002fd5cd6fd0299ccc7b11b34f5d34b92ff Mon Sep 17 00:00:00 2001 From: James Xia Date: Mon, 20 Jan 2025 18:01:02 -0800 Subject: [PATCH 04/11] Fix clippy error --- datafusion/substrait/src/logical_plan/producer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 9752484c0c626..e269cdfa921cf 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -560,7 +560,7 @@ pub fn from_table_scan( let table_schema = scan.source.schema().to_dfschema_ref()?; let base_schema = to_substrait_named_struct(&table_schema)?; - let best_effort_filter_option = if scan.filters.len() > 0 { + let best_effort_filter_option = if !scan.filters.is_empty() { let table_schema_qualified = Arc::new( DFSchema::try_from_qualified_schema( scan.table_name.clone(), From 867c763a643e9a5cb3eb46b6a9d06ef114574e91 Mon Sep 17 00:00:00 2001 From: James Xia Date: Tue, 21 Jan 2025 20:19:32 -0800 Subject: [PATCH 05/11] Use conjunction --- datafusion/substrait/src/logical_plan/producer.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index e269cdfa921cf..e942cdfd861c1 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -56,6 +56,7 @@ use datafusion::logical_expr::expr::{ InSubquery, WindowFunction, WindowFunctionParams, }; use datafusion::logical_expr::{expr, Between, JoinConstraint, LogicalPlan, Operator}; +use datafusion::logical_expr::utils::conjunction; use datafusion::prelude::Expr; use pbjson_types::Any as ProtoAny; use substrait::proto::exchange_rel::{ExchangeKind, RoundRobin, ScatterFields}; @@ -568,10 +569,7 @@ pub fn from_table_scan( ) .unwrap(), ); - let mut combined_expr = scan.filters[0].clone(); - for i in 1..scan.filters.len() { - combined_expr = combined_expr.and(scan.filters[i].clone()); - } + let combined_expr = conjunction(scan.filters.clone()).unwrap(); let best_effort_filter_expr = producer.handle_expr(&combined_expr, &table_schema_qualified)?; Some(Box::new(best_effort_filter_expr)) From 07862087c641621bbc220ed9c538e2e695f4e3df Mon Sep 17 00:00:00 2001 From: James Xia Date: Wed, 22 Jan 2025 07:45:25 -0800 Subject: [PATCH 06/11] cargo fmt --- datafusion/substrait/src/logical_plan/producer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index e942cdfd861c1..0c4bf302e5665 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -55,8 +55,8 @@ use datafusion::logical_expr::expr::{ AggregateFunctionParams, Alias, BinaryExpr, Case, Cast, GroupingSet, InList, InSubquery, WindowFunction, WindowFunctionParams, }; -use datafusion::logical_expr::{expr, Between, JoinConstraint, LogicalPlan, Operator}; use datafusion::logical_expr::utils::conjunction; +use datafusion::logical_expr::{expr, Between, JoinConstraint, LogicalPlan, Operator}; use datafusion::prelude::Expr; use pbjson_types::Any as ProtoAny; use substrait::proto::exchange_rel::{ExchangeKind, RoundRobin, ScatterFields}; From 9ff9d0a14ed670dd06dc38892b4c64226a88cae2 Mon Sep 17 00:00:00 2001 From: James Xia Date: Wed, 22 Jan 2025 13:53:50 -0800 Subject: [PATCH 07/11] Use ReadRel.filter instead of best_effort_filter --- .../substrait/src/logical_plan/producer.rs | 10 ++-- .../tests/cases/roundtrip_logical_plan.rs | 56 ++++++++----------- 2 files changed, 27 insertions(+), 39 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 0c4bf302e5665..a1f9ff1dda847 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -561,7 +561,7 @@ pub fn from_table_scan( let table_schema = scan.source.schema().to_dfschema_ref()?; let base_schema = to_substrait_named_struct(&table_schema)?; - let best_effort_filter_option = if !scan.filters.is_empty() { + let filter_option = if !scan.filters.is_empty() { let table_schema_qualified = Arc::new( DFSchema::try_from_qualified_schema( scan.table_name.clone(), @@ -570,9 +570,9 @@ pub fn from_table_scan( .unwrap(), ); let combined_expr = conjunction(scan.filters.clone()).unwrap(); - let best_effort_filter_expr = + let filter_expr = producer.handle_expr(&combined_expr, &table_schema_qualified)?; - Some(Box::new(best_effort_filter_expr)) + Some(Box::new(filter_expr)) } else { None }; @@ -581,8 +581,8 @@ pub fn from_table_scan( rel_type: Some(RelType::Read(Box::new(ReadRel { common: None, base_schema: Some(base_schema), - filter: None, - best_effort_filter: best_effort_filter_option, + filter: filter_option, + best_effort_filter: None, projection, advanced_extension: None, read_type: Some(ReadType::NamedTable(NamedTable { diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 9e725c8e72e93..cf167d7ce334b 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -1235,8 +1235,8 @@ async fn roundtrip_repartition_hash() -> Result<()> { } #[tokio::test] -async fn roundtrip_read_best_effort_filter() -> Result<()> { - roundtrip_verify_read_best_effort_filter_count( +async fn roundtrip_read_filter() -> Result<()> { + roundtrip_verify_read_filter_count( "SELECT data.a FROM data JOIN data2 ON data.a = data2.a where data.a < 5 AND data2.e > 1", 2, ) @@ -1328,46 +1328,40 @@ async fn verify_post_join_filter_value(proto: Box) -> Result<()> { Ok(()) } -fn count_read_best_effort_filters(rel: &Rel, count: &mut u32) -> Result<()> { +fn count_read_filters(rel: &Rel, count: &mut u32) -> Result<()> { // search for target_rel and field value in proto match &rel.rel_type { Some(RelType::Read(read)) => { - // increment counter for best-effort filter if not None - if read.best_effort_filter.is_some() { + // increment counter for read filter if not None + if read.filter.is_some() { *count += 1; } Ok(()) } Some(RelType::Join(join)) => { - match count_read_best_effort_filters( - join.left.as_ref().unwrap().as_ref(), - count, - ) { + match count_read_filters(join.left.as_ref().unwrap().as_ref(), count) { Err(e) => Err(e), - Ok(_) => count_read_best_effort_filters( - join.right.as_ref().unwrap().as_ref(), - count, - ), + Ok(_) => count_read_filters(join.right.as_ref().unwrap().as_ref(), count), } } Some(RelType::Project(p)) => { - count_read_best_effort_filters(p.input.as_ref().unwrap().as_ref(), count) + count_read_filters(p.input.as_ref().unwrap().as_ref(), count) } Some(RelType::Filter(filter)) => { - count_read_best_effort_filters(filter.input.as_ref().unwrap().as_ref(), count) + count_read_filters(filter.input.as_ref().unwrap().as_ref(), count) } Some(RelType::Fetch(fetch)) => { - count_read_best_effort_filters(fetch.input.as_ref().unwrap().as_ref(), count) + count_read_filters(fetch.input.as_ref().unwrap().as_ref(), count) } Some(RelType::Sort(sort)) => { - count_read_best_effort_filters(sort.input.as_ref().unwrap().as_ref(), count) + count_read_filters(sort.input.as_ref().unwrap().as_ref(), count) } Some(RelType::Aggregate(agg)) => { - count_read_best_effort_filters(agg.input.as_ref().unwrap().as_ref(), count) + count_read_filters(agg.input.as_ref().unwrap().as_ref(), count) } Some(RelType::Set(set)) => { for input in &set.inputs { - match count_read_best_effort_filters(input, count) { + match count_read_filters(input, count) { Err(e) => return Err(e), Ok(_) => continue, } @@ -1375,11 +1369,11 @@ fn count_read_best_effort_filters(rel: &Rel, count: &mut u32) -> Result<()> { Ok(()) } Some(RelType::ExtensionSingle(ext)) => { - count_read_best_effort_filters(ext.input.as_ref().unwrap().as_ref(), count) + count_read_filters(ext.input.as_ref().unwrap().as_ref(), count) } Some(RelType::ExtensionMulti(ext)) => { for input in &ext.inputs { - match count_read_best_effort_filters(input, count) { + match count_read_filters(input, count) { Err(e) => return Err(e), Ok(_) => continue, } @@ -1388,31 +1382,25 @@ fn count_read_best_effort_filters(rel: &Rel, count: &mut u32) -> Result<()> { } Some(RelType::ExtensionLeaf(_)) => Ok(()), _ => not_impl_err!( - "Unexpected RelType: {:?} in read best-effort filter check", + "Unexpected RelType: {:?} in read filter check", rel.rel_type ), } } -async fn assert_read_best_effort_filter_count( - proto: Box, - expected_count: u32, -) -> Result<()> { +async fn assert_read_filter_count(proto: Box, expected_count: u32) -> Result<()> { let mut count: u32 = 0; for relation in &proto.relations { match relation.rel_type.as_ref() { Some(rt) => match rt { plan_rel::RelType::Rel(rel) => { - match count_read_best_effort_filters(rel, &mut count) { + match count_read_filters(rel, &mut count) { Err(e) => return Err(e), Ok(_) => continue, } } plan_rel::RelType::Root(root) => { - match count_read_best_effort_filters( - root.input.as_ref().unwrap(), - &mut count, - ) { + match count_read_filters(root.input.as_ref().unwrap(), &mut count) { Err(e) => return Err(e), Ok(_) => continue, } @@ -1597,15 +1585,15 @@ async fn roundtrip_verify_post_join_filter(sql: &str) -> Result<()> { verify_post_join_filter_value(proto).await } -async fn roundtrip_verify_read_best_effort_filter_count( +async fn roundtrip_verify_read_filter_count( sql: &str, expected_count: u32, ) -> Result<()> { let ctx = create_context().await?; let proto = roundtrip_with_ctx(sql, ctx).await?; - // verify that the count of best-effort filters in read relations is as expected - assert_read_best_effort_filter_count(proto, expected_count).await + // verify that the count of filters in read relations is as expected + assert_read_filter_count(proto, expected_count).await } async fn roundtrip_all_types(sql: &str) -> Result<()> { From 60635fb83159f10763fc9bb5207ce4cac2a43f70 Mon Sep 17 00:00:00 2001 From: James Xia Date: Wed, 22 Jan 2025 16:49:41 -0800 Subject: [PATCH 08/11] Check filter types in TableScan.filters Use TableScan.source.supports_filters_pushdown() to determine if each filter in TableScan.filters should be included in ReadRel.filter or ReadRel.best_effort_filter --- .../substrait/src/logical_plan/producer.rs | 70 ++++++++--- .../tests/cases/roundtrip_logical_plan.rs | 117 ++++++++++++------ 2 files changed, 135 insertions(+), 52 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index a1f9ff1dda847..dddc3ed3a8ee7 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -56,7 +56,9 @@ use datafusion::logical_expr::expr::{ InSubquery, WindowFunction, WindowFunctionParams, }; use datafusion::logical_expr::utils::conjunction; -use datafusion::logical_expr::{expr, Between, JoinConstraint, LogicalPlan, Operator}; +use datafusion::logical_expr::{ + expr, Between, JoinConstraint, LogicalPlan, Operator, TableProviderFilterPushDown, +}; use datafusion::prelude::Expr; use pbjson_types::Any as ProtoAny; use substrait::proto::exchange_rel::{ExchangeKind, RoundRobin, ScatterFields}; @@ -561,28 +563,62 @@ pub fn from_table_scan( let table_schema = scan.source.schema().to_dfschema_ref()?; let base_schema = to_substrait_named_struct(&table_schema)?; - let filter_option = if !scan.filters.is_empty() { - let table_schema_qualified = Arc::new( - DFSchema::try_from_qualified_schema( - scan.table_name.clone(), - &(scan.source.schema()), - ) - .unwrap(), - ); - let combined_expr = conjunction(scan.filters.clone()).unwrap(); - let filter_expr = - producer.handle_expr(&combined_expr, &table_schema_qualified)?; - Some(Box::new(filter_expr)) - } else { - None - }; + let mut filter_option = None; + let mut best_effort_filter_option = None; + + if !scan.filters.is_empty() { + let mut full_filters = vec![]; + let mut partial_filters = vec![]; + let mut unsupported_filters = vec![]; + let filter_refs: Vec<&Expr> = scan.filters.iter().collect(); + + if let Ok(results) = scan.source.supports_filters_pushdown(&filter_refs) { + scan.filters + .iter() + .zip(results.iter()) + .for_each(|(x, res)| match res { + TableProviderFilterPushDown::Exact => full_filters.push(x.clone()), + TableProviderFilterPushDown::Inexact => { + partial_filters.push(x.clone()) + } + TableProviderFilterPushDown::Unsupported => { + unsupported_filters.push(x.clone()) + } + }); + } + + let table_schema_qualified = + Arc::new(if !full_filters.is_empty() || !partial_filters.is_empty() { + DFSchema::try_from_qualified_schema( + scan.table_name.clone(), + &(scan.source.schema()), + ) + .unwrap() + } else { + DFSchema::empty() + }); + + if !full_filters.is_empty() { + let combined_expr = conjunction(full_filters).unwrap(); + let filter_expr = + producer.handle_expr(&combined_expr, &table_schema_qualified)?; + filter_option = Some(Box::new(filter_expr)); + } + + if !partial_filters.is_empty() { + let combined_expr = conjunction(partial_filters).unwrap(); + let best_effort_filter_expr = + producer.handle_expr(&combined_expr, &table_schema_qualified)?; + best_effort_filter_option = Some(Box::new(best_effort_filter_expr)); + } + } Ok(Box::new(Rel { rel_type: Some(RelType::Read(Box::new(ReadRel { common: None, base_schema: Some(base_schema), filter: filter_option, - best_effort_filter: None, + best_effort_filter: best_effort_filter_option, projection, advanced_extension: None, read_type: Some(ReadType::NamedTable(NamedTable { diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index cf167d7ce334b..aab2d4116b9c0 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -1236,9 +1236,9 @@ async fn roundtrip_repartition_hash() -> Result<()> { #[tokio::test] async fn roundtrip_read_filter() -> Result<()> { - roundtrip_verify_read_filter_count( + roundtrip_verify_read_filter_counts( "SELECT data.a FROM data JOIN data2 ON data.a = data2.a where data.a < 5 AND data2.e > 1", - 2, + 0, 2, ) .await } @@ -1328,52 +1328,79 @@ async fn verify_post_join_filter_value(proto: Box) -> Result<()> { Ok(()) } -fn count_read_filters(rel: &Rel, count: &mut u32) -> Result<()> { +fn count_read_filters( + rel: &Rel, + filter_count: &mut u32, + best_effort_filter_count: &mut u32, +) -> Result<()> { // search for target_rel and field value in proto match &rel.rel_type { Some(RelType::Read(read)) => { // increment counter for read filter if not None if read.filter.is_some() { - *count += 1; + *filter_count += 1; + } + if read.best_effort_filter.is_some() { + *best_effort_filter_count += 1; } Ok(()) } Some(RelType::Join(join)) => { - match count_read_filters(join.left.as_ref().unwrap().as_ref(), count) { + match count_read_filters( + join.left.as_ref().unwrap().as_ref(), + filter_count, + best_effort_filter_count, + ) { Err(e) => Err(e), - Ok(_) => count_read_filters(join.right.as_ref().unwrap().as_ref(), count), + Ok(_) => count_read_filters( + join.right.as_ref().unwrap().as_ref(), + filter_count, + best_effort_filter_count, + ), } } - Some(RelType::Project(p)) => { - count_read_filters(p.input.as_ref().unwrap().as_ref(), count) - } - Some(RelType::Filter(filter)) => { - count_read_filters(filter.input.as_ref().unwrap().as_ref(), count) - } - Some(RelType::Fetch(fetch)) => { - count_read_filters(fetch.input.as_ref().unwrap().as_ref(), count) - } - Some(RelType::Sort(sort)) => { - count_read_filters(sort.input.as_ref().unwrap().as_ref(), count) - } - Some(RelType::Aggregate(agg)) => { - count_read_filters(agg.input.as_ref().unwrap().as_ref(), count) - } + Some(RelType::Project(p)) => count_read_filters( + p.input.as_ref().unwrap().as_ref(), + filter_count, + best_effort_filter_count, + ), + Some(RelType::Filter(filter)) => count_read_filters( + filter.input.as_ref().unwrap().as_ref(), + filter_count, + best_effort_filter_count, + ), + Some(RelType::Fetch(fetch)) => count_read_filters( + fetch.input.as_ref().unwrap().as_ref(), + filter_count, + best_effort_filter_count, + ), + Some(RelType::Sort(sort)) => count_read_filters( + sort.input.as_ref().unwrap().as_ref(), + filter_count, + best_effort_filter_count, + ), + Some(RelType::Aggregate(agg)) => count_read_filters( + agg.input.as_ref().unwrap().as_ref(), + filter_count, + best_effort_filter_count, + ), Some(RelType::Set(set)) => { for input in &set.inputs { - match count_read_filters(input, count) { + match count_read_filters(input, filter_count, best_effort_filter_count) { Err(e) => return Err(e), Ok(_) => continue, } } Ok(()) } - Some(RelType::ExtensionSingle(ext)) => { - count_read_filters(ext.input.as_ref().unwrap().as_ref(), count) - } + Some(RelType::ExtensionSingle(ext)) => count_read_filters( + ext.input.as_ref().unwrap().as_ref(), + filter_count, + best_effort_filter_count, + ), Some(RelType::ExtensionMulti(ext)) => { for input in &ext.inputs { - match count_read_filters(input, count) { + match count_read_filters(input, filter_count, best_effort_filter_count) { Err(e) => return Err(e), Ok(_) => continue, } @@ -1388,19 +1415,32 @@ fn count_read_filters(rel: &Rel, count: &mut u32) -> Result<()> { } } -async fn assert_read_filter_count(proto: Box, expected_count: u32) -> Result<()> { - let mut count: u32 = 0; +async fn assert_read_filter_counts( + proto: Box, + expected_filter_count: u32, + expected_best_effort_filter_count: u32, +) -> Result<()> { + let mut filter_count: u32 = 0; + let mut best_effort_filter_count: u32 = 0; for relation in &proto.relations { match relation.rel_type.as_ref() { Some(rt) => match rt { plan_rel::RelType::Rel(rel) => { - match count_read_filters(rel, &mut count) { + match count_read_filters( + rel, + &mut filter_count, + &mut best_effort_filter_count, + ) { Err(e) => return Err(e), Ok(_) => continue, } } plan_rel::RelType::Root(root) => { - match count_read_filters(root.input.as_ref().unwrap(), &mut count) { + match count_read_filters( + root.input.as_ref().unwrap(), + &mut filter_count, + &mut best_effort_filter_count, + ) { Err(e) => return Err(e), Ok(_) => continue, } @@ -1410,7 +1450,8 @@ async fn assert_read_filter_count(proto: Box, expected_count: u32) -> Resu } } - assert_eq!(expected_count, count); + assert_eq!(expected_filter_count, filter_count); + assert_eq!(expected_best_effort_filter_count, best_effort_filter_count); Ok(()) } @@ -1585,15 +1626,21 @@ async fn roundtrip_verify_post_join_filter(sql: &str) -> Result<()> { verify_post_join_filter_value(proto).await } -async fn roundtrip_verify_read_filter_count( +async fn roundtrip_verify_read_filter_counts( sql: &str, - expected_count: u32, + expected_filter_count: u32, + expected_best_effort_filter_count: u32, ) -> Result<()> { let ctx = create_context().await?; let proto = roundtrip_with_ctx(sql, ctx).await?; - // verify that the count of filters in read relations is as expected - assert_read_filter_count(proto, expected_count).await + // verify that filter counts in read relations are as expected + assert_read_filter_counts( + proto, + expected_filter_count, + expected_best_effort_filter_count, + ) + .await } async fn roundtrip_all_types(sql: &str) -> Result<()> { From 467aaaa172e37baa3d3fc81745d3600e66450a75 Mon Sep 17 00:00:00 2001 From: James Xia Date: Mon, 3 Feb 2025 17:56:12 -0800 Subject: [PATCH 09/11] Propagate Substrait ReadRel filter to consumer --- .../substrait/src/logical_plan/consumer.rs | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index ffeff3e9df47f..7852afaa4f3a3 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -1327,19 +1327,46 @@ pub async fn from_read_rel( table_ref: TableReference, schema: DFSchema, projection: &Option, + filter: &Option>, + best_effort_filter: &Option>, ) -> Result { let schema = schema.replace_qualifier(table_ref.clone()); + let mut filters = vec![]; + if filter.is_some() { + let filter_expr = consumer + .consume_expression(&(filter.clone().unwrap()), &schema) + .await?; + filters.append( + &mut split_conjunction(&filter_expr) + .into_iter() + .cloned() + .collect(), + ); + } + if best_effort_filter.is_some() { + let best_effort_filter_expr = consumer + .consume_expression(&(best_effort_filter.clone().unwrap()), &schema) + .await?; + filters.append( + &mut split_conjunction(&best_effort_filter_expr) + .into_iter() + .cloned() + .collect(), + ); + } + let plan = { let provider = match consumer.resolve_table_ref(&table_ref).await? { Some(ref provider) => Arc::clone(provider), _ => return plan_err!("No table named '{table_ref}'"), }; - LogicalPlanBuilder::scan( + LogicalPlanBuilder::scan_with_filters( table_ref, provider_as_source(Arc::clone(&provider)), None, + filters, )? .build()? }; @@ -1382,6 +1409,8 @@ pub async fn from_read_rel( table_reference, substrait_schema, &read.projection, + &read.filter, + &read.best_effort_filter, ) .await } @@ -1464,6 +1493,8 @@ pub async fn from_read_rel( table_reference, substrait_schema, &read.projection, + &read.filter, + &read.best_effort_filter, ) .await } From ab41a1bd9c3a83f50c72c1c9d107cfe27ea82fcc Mon Sep 17 00:00:00 2001 From: James Xia Date: Fri, 14 Feb 2025 11:04:12 -0800 Subject: [PATCH 10/11] Address PR comments --- .../substrait/src/logical_plan/consumer.rs | 43 +++++------- .../substrait/src/logical_plan/producer.rs | 28 ++++---- .../tests/cases/roundtrip_logical_plan.rs | 69 +------------------ 3 files changed, 32 insertions(+), 108 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 7852afaa4f3a3..9ac4a1c66c8f6 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -68,8 +68,8 @@ use datafusion::logical_expr::{ }; use datafusion::prelude::{lit, JoinType}; use datafusion::{ - arrow, error::Result, logical_expr::utils::split_conjunction, prelude::Column, - scalar::ScalarValue, + arrow, error::Result, logical_expr::utils::split_conjunction, + logical_expr::utils::split_conjunction_owned, prelude::Column, scalar::ScalarValue, }; use std::collections::HashSet; use std::sync::Arc; @@ -1332,29 +1332,20 @@ pub async fn from_read_rel( ) -> Result { let schema = schema.replace_qualifier(table_ref.clone()); - let mut filters = vec![]; - if filter.is_some() { - let filter_expr = consumer - .consume_expression(&(filter.clone().unwrap()), &schema) - .await?; - filters.append( - &mut split_conjunction(&filter_expr) - .into_iter() - .cloned() - .collect(), - ); - } - if best_effort_filter.is_some() { - let best_effort_filter_expr = consumer - .consume_expression(&(best_effort_filter.clone().unwrap()), &schema) - .await?; - filters.append( - &mut split_conjunction(&best_effort_filter_expr) - .into_iter() - .cloned() - .collect(), - ); - } + let filters = if let Some(f) = filter { + let filter_expr = consumer.consume_expression(&(f.clone()), &schema).await?; + split_conjunction_owned(filter_expr) + } else { + vec![] + }; + + let best_effort_filters = if let Some(bef) = best_effort_filter { + let best_effort_filter_expr = + consumer.consume_expression(&(bef.clone()), &schema).await?; + split_conjunction_owned(best_effort_filter_expr) + } else { + vec![] + }; let plan = { let provider = match consumer.resolve_table_ref(&table_ref).await? { @@ -1366,7 +1357,7 @@ pub async fn from_read_rel( table_ref, provider_as_source(Arc::clone(&provider)), None, - filters, + [filters, best_effort_filters].concat(), )? .build()? }; diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index dddc3ed3a8ee7..9308626a03568 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -567,9 +567,8 @@ pub fn from_table_scan( let mut best_effort_filter_option = None; if !scan.filters.is_empty() { - let mut full_filters = vec![]; - let mut partial_filters = vec![]; - let mut unsupported_filters = vec![]; + let mut exact_filters = vec![]; + let mut inexact_filters = vec![]; let filter_refs: Vec<&Expr> = scan.filters.iter().collect(); if let Ok(results) = scan.source.supports_filters_pushdown(&filter_refs) { @@ -577,18 +576,16 @@ pub fn from_table_scan( .iter() .zip(results.iter()) .for_each(|(x, res)| match res { - TableProviderFilterPushDown::Exact => full_filters.push(x.clone()), + TableProviderFilterPushDown::Exact => exact_filters.push(x.clone()), TableProviderFilterPushDown::Inexact => { - partial_filters.push(x.clone()) - } - TableProviderFilterPushDown::Unsupported => { - unsupported_filters.push(x.clone()) + inexact_filters.push(x.clone()) } + _ => {} }); } - let table_schema_qualified = - Arc::new(if !full_filters.is_empty() || !partial_filters.is_empty() { + let table_schema_qualified = Arc::new( + if !exact_filters.is_empty() || !inexact_filters.is_empty() { DFSchema::try_from_qualified_schema( scan.table_name.clone(), &(scan.source.schema()), @@ -596,17 +593,18 @@ pub fn from_table_scan( .unwrap() } else { DFSchema::empty() - }); + }, + ); - if !full_filters.is_empty() { - let combined_expr = conjunction(full_filters).unwrap(); + if !exact_filters.is_empty() { + let combined_expr = conjunction(exact_filters).unwrap(); let filter_expr = producer.handle_expr(&combined_expr, &table_schema_qualified)?; filter_option = Some(Box::new(filter_expr)); } - if !partial_filters.is_empty() { - let combined_expr = conjunction(partial_filters).unwrap(); + if !inexact_filters.is_empty() { + let combined_expr = conjunction(inexact_filters).unwrap(); let best_effort_filter_expr = producer.handle_expr(&combined_expr, &table_schema_qualified)?; best_effort_filter_option = Some(Box::new(best_effort_filter_expr)); diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index aab2d4116b9c0..7e75c659f562a 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -1236,11 +1236,7 @@ async fn roundtrip_repartition_hash() -> Result<()> { #[tokio::test] async fn roundtrip_read_filter() -> Result<()> { - roundtrip_verify_read_filter_counts( - "SELECT data.a FROM data JOIN data2 ON data.a = data2.a where data.a < 5 AND data2.e > 1", - 0, 2, - ) - .await + roundtrip_verify_read_filter_counts("SELECT a FROM data where a < 5", 0, 1).await } fn check_post_join_filters(rel: &Rel) -> Result<()> { @@ -1345,73 +1341,12 @@ fn count_read_filters( } Ok(()) } - Some(RelType::Join(join)) => { - match count_read_filters( - join.left.as_ref().unwrap().as_ref(), - filter_count, - best_effort_filter_count, - ) { - Err(e) => Err(e), - Ok(_) => count_read_filters( - join.right.as_ref().unwrap().as_ref(), - filter_count, - best_effort_filter_count, - ), - } - } - Some(RelType::Project(p)) => count_read_filters( - p.input.as_ref().unwrap().as_ref(), - filter_count, - best_effort_filter_count, - ), Some(RelType::Filter(filter)) => count_read_filters( filter.input.as_ref().unwrap().as_ref(), filter_count, best_effort_filter_count, ), - Some(RelType::Fetch(fetch)) => count_read_filters( - fetch.input.as_ref().unwrap().as_ref(), - filter_count, - best_effort_filter_count, - ), - Some(RelType::Sort(sort)) => count_read_filters( - sort.input.as_ref().unwrap().as_ref(), - filter_count, - best_effort_filter_count, - ), - Some(RelType::Aggregate(agg)) => count_read_filters( - agg.input.as_ref().unwrap().as_ref(), - filter_count, - best_effort_filter_count, - ), - Some(RelType::Set(set)) => { - for input in &set.inputs { - match count_read_filters(input, filter_count, best_effort_filter_count) { - Err(e) => return Err(e), - Ok(_) => continue, - } - } - Ok(()) - } - Some(RelType::ExtensionSingle(ext)) => count_read_filters( - ext.input.as_ref().unwrap().as_ref(), - filter_count, - best_effort_filter_count, - ), - Some(RelType::ExtensionMulti(ext)) => { - for input in &ext.inputs { - match count_read_filters(input, filter_count, best_effort_filter_count) { - Err(e) => return Err(e), - Ok(_) => continue, - } - } - Ok(()) - } - Some(RelType::ExtensionLeaf(_)) => Ok(()), - _ => not_impl_err!( - "Unexpected RelType: {:?} in read filter check", - rel.rel_type - ), + _ => Ok(()), } } From f803557e8028a0bc57751092e9041b7d5c27e03a Mon Sep 17 00:00:00 2001 From: James Xia Date: Sat, 1 Mar 2025 08:39:57 -0800 Subject: [PATCH 11/11] Propagate TableScan filters to ReadRel filter --- .../substrait/src/logical_plan/consumer.rs | 15 +---- .../substrait/src/logical_plan/producer.rs | 63 +++++-------------- .../tests/cases/roundtrip_logical_plan.rs | 41 +++--------- 3 files changed, 26 insertions(+), 93 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 9ac4a1c66c8f6..7c6c45f44db7e 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -1328,25 +1328,16 @@ pub async fn from_read_rel( schema: DFSchema, projection: &Option, filter: &Option>, - best_effort_filter: &Option>, ) -> Result { let schema = schema.replace_qualifier(table_ref.clone()); let filters = if let Some(f) = filter { - let filter_expr = consumer.consume_expression(&(f.clone()), &schema).await?; + let filter_expr = consumer.consume_expression(f, &schema).await?; split_conjunction_owned(filter_expr) } else { vec![] }; - let best_effort_filters = if let Some(bef) = best_effort_filter { - let best_effort_filter_expr = - consumer.consume_expression(&(bef.clone()), &schema).await?; - split_conjunction_owned(best_effort_filter_expr) - } else { - vec![] - }; - let plan = { let provider = match consumer.resolve_table_ref(&table_ref).await? { Some(ref provider) => Arc::clone(provider), @@ -1357,7 +1348,7 @@ pub async fn from_read_rel( table_ref, provider_as_source(Arc::clone(&provider)), None, - [filters, best_effort_filters].concat(), + filters, )? .build()? }; @@ -1401,7 +1392,6 @@ pub async fn from_read_rel( substrait_schema, &read.projection, &read.filter, - &read.best_effort_filter, ) .await } @@ -1485,7 +1475,6 @@ pub async fn from_read_rel( substrait_schema, &read.projection, &read.filter, - &read.best_effort_filter, ) .await } diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 9308626a03568..67fa5aed8da62 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -56,9 +56,7 @@ use datafusion::logical_expr::expr::{ InSubquery, WindowFunction, WindowFunctionParams, }; use datafusion::logical_expr::utils::conjunction; -use datafusion::logical_expr::{ - expr, Between, JoinConstraint, LogicalPlan, Operator, TableProviderFilterPushDown, -}; +use datafusion::logical_expr::{expr, Between, JoinConstraint, LogicalPlan, Operator}; use datafusion::prelude::Expr; use pbjson_types::Any as ProtoAny; use substrait::proto::exchange_rel::{ExchangeKind, RoundRobin, ScatterFields}; @@ -563,60 +561,29 @@ pub fn from_table_scan( let table_schema = scan.source.schema().to_dfschema_ref()?; let base_schema = to_substrait_named_struct(&table_schema)?; - let mut filter_option = None; - let mut best_effort_filter_option = None; - - if !scan.filters.is_empty() { - let mut exact_filters = vec![]; - let mut inexact_filters = vec![]; - let filter_refs: Vec<&Expr> = scan.filters.iter().collect(); - - if let Ok(results) = scan.source.supports_filters_pushdown(&filter_refs) { - scan.filters - .iter() - .zip(results.iter()) - .for_each(|(x, res)| match res { - TableProviderFilterPushDown::Exact => exact_filters.push(x.clone()), - TableProviderFilterPushDown::Inexact => { - inexact_filters.push(x.clone()) - } - _ => {} - }); - } - + let filter_option = if scan.filters.is_empty() { + None + } else { let table_schema_qualified = Arc::new( - if !exact_filters.is_empty() || !inexact_filters.is_empty() { - DFSchema::try_from_qualified_schema( - scan.table_name.clone(), - &(scan.source.schema()), - ) - .unwrap() - } else { - DFSchema::empty() - }, + DFSchema::try_from_qualified_schema( + scan.table_name.clone(), + &(scan.source.schema()), + ) + .unwrap(), ); - if !exact_filters.is_empty() { - let combined_expr = conjunction(exact_filters).unwrap(); - let filter_expr = - producer.handle_expr(&combined_expr, &table_schema_qualified)?; - filter_option = Some(Box::new(filter_expr)); - } - - if !inexact_filters.is_empty() { - let combined_expr = conjunction(inexact_filters).unwrap(); - let best_effort_filter_expr = - producer.handle_expr(&combined_expr, &table_schema_qualified)?; - best_effort_filter_option = Some(Box::new(best_effort_filter_expr)); - } - } + let combined_expr = conjunction(scan.filters.clone()).unwrap(); + let filter_expr = + producer.handle_expr(&combined_expr, &table_schema_qualified)?; + Some(Box::new(filter_expr)) + }; Ok(Box::new(Rel { rel_type: Some(RelType::Read(Box::new(ReadRel { common: None, base_schema: Some(base_schema), filter: filter_option, - best_effort_filter: best_effort_filter_option, + best_effort_filter: None, projection, advanced_extension: None, read_type: Some(ReadType::NamedTable(NamedTable { diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 7e75c659f562a..f989d05c80dd1 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -1236,7 +1236,7 @@ async fn roundtrip_repartition_hash() -> Result<()> { #[tokio::test] async fn roundtrip_read_filter() -> Result<()> { - roundtrip_verify_read_filter_counts("SELECT a FROM data where a < 5", 0, 1).await + roundtrip_verify_read_filter_count("SELECT a FROM data where a < 5", 1).await } fn check_post_join_filters(rel: &Rel) -> Result<()> { @@ -1324,11 +1324,7 @@ async fn verify_post_join_filter_value(proto: Box) -> Result<()> { Ok(()) } -fn count_read_filters( - rel: &Rel, - filter_count: &mut u32, - best_effort_filter_count: &mut u32, -) -> Result<()> { +fn count_read_filters(rel: &Rel, filter_count: &mut u32) -> Result<()> { // search for target_rel and field value in proto match &rel.rel_type { Some(RelType::Read(read)) => { @@ -1336,36 +1332,25 @@ fn count_read_filters( if read.filter.is_some() { *filter_count += 1; } - if read.best_effort_filter.is_some() { - *best_effort_filter_count += 1; - } Ok(()) } - Some(RelType::Filter(filter)) => count_read_filters( - filter.input.as_ref().unwrap().as_ref(), - filter_count, - best_effort_filter_count, - ), + Some(RelType::Filter(filter)) => { + count_read_filters(filter.input.as_ref().unwrap().as_ref(), filter_count) + } _ => Ok(()), } } -async fn assert_read_filter_counts( +async fn assert_read_filter_count( proto: Box, expected_filter_count: u32, - expected_best_effort_filter_count: u32, ) -> Result<()> { let mut filter_count: u32 = 0; - let mut best_effort_filter_count: u32 = 0; for relation in &proto.relations { match relation.rel_type.as_ref() { Some(rt) => match rt { plan_rel::RelType::Rel(rel) => { - match count_read_filters( - rel, - &mut filter_count, - &mut best_effort_filter_count, - ) { + match count_read_filters(rel, &mut filter_count) { Err(e) => return Err(e), Ok(_) => continue, } @@ -1374,7 +1359,6 @@ async fn assert_read_filter_counts( match count_read_filters( root.input.as_ref().unwrap(), &mut filter_count, - &mut best_effort_filter_count, ) { Err(e) => return Err(e), Ok(_) => continue, @@ -1386,7 +1370,6 @@ async fn assert_read_filter_counts( } assert_eq!(expected_filter_count, filter_count); - assert_eq!(expected_best_effort_filter_count, best_effort_filter_count); Ok(()) } @@ -1561,21 +1544,15 @@ async fn roundtrip_verify_post_join_filter(sql: &str) -> Result<()> { verify_post_join_filter_value(proto).await } -async fn roundtrip_verify_read_filter_counts( +async fn roundtrip_verify_read_filter_count( sql: &str, expected_filter_count: u32, - expected_best_effort_filter_count: u32, ) -> Result<()> { let ctx = create_context().await?; let proto = roundtrip_with_ctx(sql, ctx).await?; // verify that filter counts in read relations are as expected - assert_read_filter_counts( - proto, - expected_filter_count, - expected_best_effort_filter_count, - ) - .await + assert_read_filter_count(proto, expected_filter_count).await } async fn roundtrip_all_types(sql: &str) -> Result<()> {