From bd9c018971a766c2d5664b39c64538ab0038e023 Mon Sep 17 00:00:00 2001 From: zhongjingxiong Date: Wed, 1 Nov 2023 14:22:37 +0800 Subject: [PATCH 1/4] Fix: Optimizer rule 'common_sub_expression_eliminate' failed --- datafusion-examples/examples/dataframe_subquery.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/examples/dataframe_subquery.rs b/datafusion-examples/examples/dataframe_subquery.rs index 94049e59b3ab8..a938ee062bc4b 100644 --- a/datafusion-examples/examples/dataframe_subquery.rs +++ b/datafusion-examples/examples/dataframe_subquery.rs @@ -16,6 +16,7 @@ // under the License. use std::sync::Arc; +use arrow_schema::DataType; use datafusion::error::Result; use datafusion::prelude::*; @@ -46,7 +47,7 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> { scalar_subquery(Arc::new( ctx.table("t2") .await? - .filter(col("t1.c1").eq(col("t2.c1")))? + .filter(out_ref_col(DataType::Utf8,"t1.c1").eq(col("t2.c1")))? .aggregate(vec![], vec![avg(col("t2.c2"))])? .select(vec![avg(col("t2.c2"))])? .into_unoptimized_plan(), @@ -89,7 +90,7 @@ async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> { .filter(exists(Arc::new( ctx.table("t2") .await? - .filter(col("t1.c1").eq(col("t2.c1")))? + .filter(out_ref_col(DataType::Utf8,"t1.c1").eq(col("t2.c1")))? .select(vec![col("t2.c2")])? .into_unoptimized_plan(), )))? From f06c81dbbb5d52b1453fadeda6eda2ba2005f832 Mon Sep 17 00:00:00 2001 From: zhongjingxiong Date: Wed, 1 Nov 2023 14:35:37 +0800 Subject: [PATCH 2/4] nit --- datafusion-examples/examples/dataframe_subquery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/examples/dataframe_subquery.rs b/datafusion-examples/examples/dataframe_subquery.rs index a938ee062bc4b..25925bfc9bdd6 100644 --- a/datafusion-examples/examples/dataframe_subquery.rs +++ b/datafusion-examples/examples/dataframe_subquery.rs @@ -47,7 +47,7 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> { scalar_subquery(Arc::new( ctx.table("t2") .await? - .filter(out_ref_col(DataType::Utf8,"t1.c1").eq(col("t2.c1")))? + .filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))? .aggregate(vec![], vec![avg(col("t2.c2"))])? .select(vec![avg(col("t2.c2"))])? .into_unoptimized_plan(), @@ -90,7 +90,7 @@ async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> { .filter(exists(Arc::new( ctx.table("t2") .await? - .filter(out_ref_col(DataType::Utf8,"t1.c1").eq(col("t2.c1")))? + .filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))? .select(vec![col("t2.c2")])? .into_unoptimized_plan(), )))? From f11a0f03da56c157d33fc813a823e12bf6b3d1f0 Mon Sep 17 00:00:00 2001 From: zhongjingxiong Date: Wed, 1 Nov 2023 14:49:51 +0800 Subject: [PATCH 3/4] nit --- datafusion-examples/examples/dataframe_subquery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/examples/dataframe_subquery.rs b/datafusion-examples/examples/dataframe_subquery.rs index 25925bfc9bdd6..caec0b6683cc4 100644 --- a/datafusion-examples/examples/dataframe_subquery.rs +++ b/datafusion-examples/examples/dataframe_subquery.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; use arrow_schema::DataType; +use std::sync::Arc; use datafusion::error::Result; use datafusion::prelude::*; From 98dfcb37ef45af988736161e2cb0d8aa52731b35 Mon Sep 17 00:00:00 2001 From: zhongjingxiong Date: Wed, 1 Nov 2023 16:12:48 +0800 Subject: [PATCH 4/4] nit --- datafusion-examples/examples/dataframe_subquery.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/examples/dataframe_subquery.rs b/datafusion-examples/examples/dataframe_subquery.rs index caec0b6683cc4..9fb61008b9f69 100644 --- a/datafusion-examples/examples/dataframe_subquery.rs +++ b/datafusion-examples/examples/dataframe_subquery.rs @@ -39,7 +39,7 @@ async fn main() -> Result<()> { Ok(()) } -//select c1,c2 from t1 where (select avg(t2.c2) from t2 where t1.c1 = t2.c1)>0 limit 10; +//select c1,c2 from t1 where (select avg(t2.c2) from t2 where t1.c1 = t2.c1)>0 limit 3; async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> { ctx.table("t1") .await? @@ -61,7 +61,7 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> { Ok(()) } -//SELECT t1.c1, t1.c2 FROM t1 WHERE t1.c2 in (select max(t2.c2) from t2 where t2.c1 > 0 ) limit 10 +//SELECT t1.c1, t1.c2 FROM t1 WHERE t1.c2 in (select max(t2.c2) from t2 where t2.c1 > 0 ) limit 3; async fn where_in_subquery(ctx: &SessionContext) -> Result<()> { ctx.table("t1") .await? @@ -83,7 +83,7 @@ async fn where_in_subquery(ctx: &SessionContext) -> Result<()> { Ok(()) } -//SELECT t1.c1, t1.c2 FROM t1 WHERE EXISTS (select t2.c2 from t2 where t1.c1 = t2.c1) limit 10 +//SELECT t1.c1, t1.c2 FROM t1 WHERE EXISTS (select t2.c2 from t2 where t1.c1 = t2.c1) limit 3; async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> { ctx.table("t1") .await?