From 8e4c5e0552e46b57f7aec330b1cd98d2e626721e Mon Sep 17 00:00:00 2001 From: DaltonModlin Date: Mon, 29 Aug 2022 13:07:43 -0600 Subject: [PATCH 1/3] Execute LogicalPlans after building for TPCH Benchmarks - tpch.rs::benchmark_datafusion now executes LogicalPlan immediately after building - tpch.rs::run_query now executes LogicalPlan immediately after building Resolves #3273 --- benchmarks/src/bin/tpch.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 08374bfcc5b21..c874ef09ab8fe 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -196,10 +196,12 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result = Vec::with_capacity(1); for i in 0..opt.iterations { let start = Instant::now(); - let plans = create_logical_plans(&ctx, opt.query)?; - for plan in plans { - result = execute_query(&ctx, &plan, opt.debug).await?; + + let sql = get_query_sql(opt.query)?; + for query in sql { + result = ctx.sql(&query).await?.collect().await?; } + let elapsed = start.elapsed().as_secs_f64() * 1000.0; millis.push(elapsed as f64); let row_count = result.iter().map(|b| b.num_rows()).sum(); @@ -253,7 +255,7 @@ fn get_query_sql(query: usize) -> Result> { .map(|s| s.trim()) .filter(|s| !s.is_empty()) .map(|s| s.to_string()) - .collect()) + .collect()); } Err(e) => errors.push(format!("{}: {}", filename, e)), }; @@ -357,7 +359,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { return Err(DataFusionError::NotImplemented(format!( "Invalid compression format: {}", other - ))) + ))); } }; let props = WriterProperties::builder() @@ -369,7 +371,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { return Err(DataFusionError::NotImplemented(format!( "Invalid output format: {}", other - ))) + ))); } } println!("Conversion completed in {} ms", start.elapsed().as_millis()); @@ -1022,9 +1024,10 @@ mod tests { ctx.register_table(table, Arc::new(provider))?; } - let plans = create_logical_plans(&ctx, n)?; - for plan in plans { - execute_query(&ctx, &plan, false).await?; + let sql = &get_query_sql(n)?; + + for query in sql { + ctx.sql(query).await?.collect().await?; } Ok(()) From ed5dd14ad67f25ccd84d802fd767ae86ecdd4589 Mon Sep 17 00:00:00 2001 From: DaltonModlin Date: Mon, 29 Aug 2022 16:07:24 -0600 Subject: [PATCH 2/3] PR fixes - Updated execute_query to accept sql string instead of LogicalPlan as second argument - Updated tpch.rs::run_query to use tpch.rs::execute_query - Updated tpch.rs::benchmark_datafusion to use tpch.rs::execute_query --- benchmarks/src/bin/tpch.rs | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index c874ef09ab8fe..6cf0d6aa142ed 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -197,9 +197,9 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result> { } } -/// Create a logical plan for each query in the specified query file -fn create_logical_plans(ctx: &SessionContext, query: usize) -> Result> { - let sql = get_query_sql(query)?; - sql.iter() - .map(|sql| ctx.create_logical_plan(sql.as_str())) - .collect::>>() -} - async fn execute_query( ctx: &SessionContext, - plan: &LogicalPlan, + sql: &String, debug: bool, ) -> Result> { + let plan = ctx.sql(sql).await?; + let plan = plan.to_logical_plan()?; + if debug { println!("=== Logical plan ===\n{:?}\n", plan); } - let plan = ctx.optimize(plan)?; + let plan = ctx.optimize(&plan)?; if debug { println!("=== Optimized logical plan ===\n{:?}\n", plan); } @@ -1025,9 +1020,8 @@ mod tests { } let sql = &get_query_sql(n)?; - for query in sql { - ctx.sql(query).await?.collect().await?; + execute_query(&ctx, query, debug); } Ok(()) From 1f83b90d622ab11149f128f136527f9ef51e4ef6 Mon Sep 17 00:00:00 2001 From: DaltonModlin Date: Mon, 29 Aug 2022 16:51:02 -0600 Subject: [PATCH 3/3] Resolved pipeline errors - Removed unused include of LogicalPlan module in tpch.rs --- benchmarks/src/bin/tpch.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 6cf0d6aa142ed..43db654e83f9f 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -28,7 +28,6 @@ use std::{ use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; -use datafusion::logical_plan::LogicalPlan; use datafusion::parquet::basic::Compression; use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_plan::display::DisplayableExecutionPlan; @@ -197,9 +196,9 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result> { async fn execute_query( ctx: &SessionContext, - sql: &String, + sql: &str, debug: bool, ) -> Result> { let plan = ctx.sql(sql).await?; @@ -1021,7 +1020,7 @@ mod tests { let sql = &get_query_sql(n)?; for query in sql { - execute_query(&ctx, query, debug); + execute_query(&ctx, query, false).await?; } Ok(())