From a29799fb6b8a9944c80d479406e763a7b6631f58 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 May 2022 07:10:44 -0600 Subject: [PATCH 1/4] add scan_empty method to tests --- datafusion/core/src/test_util.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 1dd36316b797b..3e174e9dc67b7 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -20,7 +20,10 @@ use std::collections::BTreeMap; use std::{env, error::Error, path::PathBuf, sync::Arc}; +use crate::datasource::empty::EmptyTable; +use crate::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::DataFusionError; /// Compares formatted output of a record batch with an expected /// vector of strings, with the result of pretty formatting record @@ -232,6 +235,17 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result, + table_schema: &Schema, + projection: Option>, +) -> Result { + let table_schema = Arc::new(table_schema.clone()); + let provider = Arc::new(EmptyTable::new(table_schema)); + LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection) +} + /// Get the schema for the aggregate_test_* csv files pub fn aggr_test_schema() -> SchemaRef { let mut f1 = Field::new("c1", DataType::Utf8, false); From dadb0fcbfaacc5a9e9005928916772b1e044ed04 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 May 2022 07:19:08 -0600 Subject: [PATCH 2/4] update tests to use new scan_empty test method --- datafusion/core/src/logical_plan/builder.rs | 89 ++++++++----------- datafusion/core/src/logical_plan/plan.rs | 25 +++--- .../src/optimizer/projection_push_down.rs | 10 +-- .../src/optimizer/simplify_expressions.rs | 3 +- datafusion/core/src/physical_plan/planner.rs | 14 +-- datafusion/core/src/test/mod.rs | 6 +- datafusion/core/tests/sql/aggregates.rs | 5 +- datafusion/core/tests/sql/explain.rs | 5 +- datafusion/core/tests/sql/projection.rs | 3 +- 9 files changed, 73 insertions(+), 87 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 739d194949f3f..cb9107e540327 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -951,20 +951,18 @@ mod tests { use crate::logical_plan::StringifiedPlan; use crate::prelude::*; use crate::test::test_table_scan_with_name; + use crate::test_util::scan_empty; use super::super::{col, lit, sum}; use super::*; #[test] fn plan_builder_simple() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![0, 3]), - )? - .filter(col("state").eq(lit("CO")))? - .project(vec![col("id")])? - .build()?; + let plan = + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))? + .filter(col("state").eq(lit("CO")))? + .project(vec![col("id")])? + .build()?; let expected = "Projection: #employee_csv.id\ \n Filter: #employee_csv.state = Utf8(\"CO\")\ @@ -978,8 +976,7 @@ mod tests { #[test] fn plan_builder_schema() { let schema = employee_schema(); - let plan = - LogicalPlanBuilder::scan_empty(Some("employee_csv"), &schema, None).unwrap(); + let plan = scan_empty(Some("employee_csv"), &schema, None).unwrap(); let expected = DFSchema::try_from_qualified_schema("employee_csv", &schema).unwrap(); @@ -989,19 +986,16 @@ mod tests { #[test] fn plan_builder_aggregate() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![3, 4]), - )? - .aggregate( - vec![col("state")], - vec![sum(col("salary")).alias("total_salary")], - )? - .project(vec![col("state"), col("total_salary")])? - .limit(10)? - .offset(2)? - .build()?; + let plan = + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? + .aggregate( + vec![col("state")], + vec![sum(col("salary")).alias("total_salary")], + )? + .project(vec![col("state"), col("total_salary")])? + .limit(10)? + .offset(2)? + .build()?; let expected = "Offset: 2\ \n Limit: 10\ @@ -1016,24 +1010,21 @@ mod tests { #[test] fn plan_builder_sort() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![3, 4]), - )? - .sort(vec![ - Expr::Sort { - expr: Box::new(col("state")), - asc: true, - nulls_first: true, - }, - Expr::Sort { - expr: Box::new(col("salary")), - asc: false, - nulls_first: false, - }, - ])? - .build()?; + let plan = + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? + .sort(vec![ + Expr::Sort { + expr: Box::new(col("state")), + asc: true, + nulls_first: true, + }, + Expr::Sort { + expr: Box::new(col("salary")), + asc: false, + nulls_first: false, + }, + ])? + .build()?; let expected = "Sort: #employee_csv.state ASC NULLS FIRST, #employee_csv.salary DESC NULLS LAST\ \n TableScan: employee_csv projection=Some([3, 4])"; @@ -1045,10 +1036,9 @@ mod tests { #[test] fn plan_using_join_wildcard_projection() -> Result<()> { - let t2 = LogicalPlanBuilder::scan_empty(Some("t2"), &employee_schema(), None)? - .build()?; + let t2 = scan_empty(Some("t2"), &employee_schema(), None)?.build()?; - let plan = LogicalPlanBuilder::scan_empty(Some("t1"), &employee_schema(), None)? + let plan = scan_empty(Some("t1"), &employee_schema(), None)? .join_using(&t2, JoinType::Inner, vec!["id"])? .project(vec![Expr::Wildcard])? .build()?; @@ -1066,11 +1056,8 @@ mod tests { #[test] fn plan_builder_union_combined_single_union() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![3, 4]), - )?; + let plan = + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?; let plan = plan .union(plan.build()?)? @@ -1167,7 +1154,7 @@ mod tests { #[test] fn projection_non_unique_names() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( + let plan = scan_empty( Some("employee_csv"), &employee_schema(), // project id and first_name by column index @@ -1193,7 +1180,7 @@ mod tests { #[test] fn aggregate_non_unique_names() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( + let plan = scan_empty( Some("employee_csv"), &employee_schema(), // project state and salary by column index diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index f12981707f86a..1dfda1a8665e3 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -100,8 +100,9 @@ pub fn source_as_provider( #[cfg(test)] mod tests { - use super::super::{col, lit, LogicalPlanBuilder}; + use super::super::{col, lit}; use super::*; + use crate::test_util::scan_empty; use arrow::datatypes::{DataType, Field, Schema}; fn employee_schema() -> Schema { @@ -115,18 +116,14 @@ mod tests { } fn display_plan() -> LogicalPlan { - LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![0, 3]), - ) - .unwrap() - .filter(col("state").eq(lit("CO"))) - .unwrap() - .project(vec![col("id")]) - .unwrap() - .build() - .unwrap() + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![0, 3])) + .unwrap() + .filter(col("state").eq(lit("CO"))) + .unwrap() + .project(vec![col("id")]) + .unwrap() + .build() + .unwrap() } #[test] @@ -424,7 +421,7 @@ mod tests { Field::new("state", DataType::Utf8, false), ]); - LogicalPlanBuilder::scan_empty(None, &schema, Some(vec![0, 1])) + scan_empty(None, &schema, Some(vec![0, 1])) .unwrap() .filter(col("state").eq(lit("CO"))) .unwrap() diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 20b8f683dfb0b..cf14adcd18035 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -517,6 +517,7 @@ mod tests { col, exprlist_to_fields, lit, max, min, Expr, JoinType, LogicalPlanBuilder, }; use crate::test::*; + use crate::test_util::scan_empty; use arrow::datatypes::DataType; #[test] @@ -646,8 +647,7 @@ mod tests { let table_scan = test_table_scan()?; let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]); - let table2_scan = - LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?; + let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?; let plan = LogicalPlanBuilder::from(table_scan) .join(&table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]))? @@ -688,8 +688,7 @@ mod tests { let table_scan = test_table_scan()?; let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]); - let table2_scan = - LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?; + let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?; let plan = LogicalPlanBuilder::from(table_scan) .join(&table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]))? @@ -732,8 +731,7 @@ mod tests { let table_scan = test_table_scan()?; let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]); - let table2_scan = - LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?; + let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?; let plan = LogicalPlanBuilder::from(table_scan) .join_using(&table2_scan, JoinType::Left, vec!["a"])? diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index e9694ebc528cb..216af223a07c5 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -749,6 +749,7 @@ mod tests { }; use crate::physical_plan::functions::make_scalar_function; use crate::physical_plan::udf::ScalarUDF; + use crate::test_util::scan_empty; #[test] fn test_simplify_or_true() { @@ -1508,7 +1509,7 @@ mod tests { Field::new("c", DataType::Boolean, false), Field::new("d", DataType::UInt32, false), ]); - LogicalPlanBuilder::scan_empty(Some("test"), &schema, None) + scan_empty(Some("test"), &schema, None) .expect("creating scan") .build() .expect("building plan") diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 911f9f67f5736..27dd46b072d4b 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1543,6 +1543,7 @@ mod tests { }; use crate::prelude::{SessionConfig, SessionContext}; use crate::scalar::ScalarValue; + use crate::test_util::scan_empty; use crate::{ logical_plan::LogicalPlanBuilder, physical_plan::SendableRecordBatchStream, }; @@ -1856,13 +1857,12 @@ mod tests { async fn test_explain() { let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); - let logical_plan = - LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None) - .unwrap() - .explain(true, false) - .unwrap() - .build() - .unwrap(); + let logical_plan = scan_empty(Some("employee"), &schema, None) + .unwrap() + .explain(true, false) + .unwrap() + .build() + .unwrap(); let plan = plan(&logical_plan).await.unwrap(); if let Some(plan) = plan.as_any().downcast_ref::() { diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index c8523a52e44d7..1798232b3fdfc 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -21,9 +21,9 @@ use crate::arrow::array::UInt32Array; use crate::datasource::{listing::local_unpartitioned_file, MemTable, TableProvider}; use crate::error::Result; use crate::from_slice::FromSlice; -use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder}; +use crate::logical_plan::LogicalPlan; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; -use crate::test_util::aggr_test_schema; +use crate::test_util::{aggr_test_schema, scan_empty}; use array::{Array, ArrayRef}; use arrow::array::{self, DecimalBuilder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -132,7 +132,7 @@ pub fn test_table_scan_with_name(name: &str) -> Result { Field::new("b", DataType::UInt32, false), Field::new("c", DataType::UInt32, false), ]); - LogicalPlanBuilder::scan_empty(Some(name), &schema, None)?.build() + scan_empty(Some(name), &schema, None)?.build() } /// some tests share a common table diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 42999a743bbd2..08ccbe4530427 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -16,7 +16,8 @@ // under the License. use super::*; -use datafusion::{logical_plan::LogicalPlanBuilder, scalar::ScalarValue}; +use datafusion::scalar::ScalarValue; +use datafusion::test_util::scan_empty; #[tokio::test] async fn csv_query_avg_multi_batch() -> Result<()> { @@ -1479,7 +1480,7 @@ async fn aggregate_with_alias() -> Result<()> { Field::new("c2", DataType::UInt32, false), ])); - let plan = LogicalPlanBuilder::scan_empty(None, schema.as_ref(), None)? + let plan = scan_empty(None, schema.as_ref(), None)? .aggregate(vec![col("c1")], vec![sum(col("c2"))])? .project(vec![col("c1"), sum(col("c2")).alias("total_salary")])? .build()?; diff --git a/datafusion/core/tests/sql/explain.rs b/datafusion/core/tests/sql/explain.rs index b85228016e507..97e7e6e761237 100644 --- a/datafusion/core/tests/sql/explain.rs +++ b/datafusion/core/tests/sql/explain.rs @@ -16,8 +16,9 @@ // under the License. use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::test_util::scan_empty; use datafusion::{ - logical_plan::{LogicalPlan, LogicalPlanBuilder, PlanType}, + logical_plan::{LogicalPlan, PlanType}, prelude::SessionContext, }; @@ -25,7 +26,7 @@ use datafusion::{ fn optimize_explain() { let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); - let plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None) + let plan = scan_empty(Some("employee"), &schema, None) .unwrap() .explain(true, false) .unwrap() diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index 0b7ce860eccca..e1b1742bfa2da 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -16,6 +16,7 @@ // under the License. use datafusion::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE}; +use datafusion::test_util::scan_empty; use tempfile::TempDir; use super::*; @@ -209,7 +210,7 @@ async fn preserve_nullability_on_projection() -> Result<()> { let schema: Schema = ctx.table("test").unwrap().schema().clone().into(); assert!(!schema.field_with_name("c1")?.is_nullable()); - let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)? + let plan = scan_empty(None, &schema, None)? .project(vec![col("c1")])? .build()?; From f6bf0c73d921d015c8c062ceb2c5b83b574bb172 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 May 2022 07:22:52 -0600 Subject: [PATCH 3/4] remove LogicalPlanBuilder::scan_empty --- .../rust/scheduler/src/scheduler_server/mod.rs | 3 ++- datafusion/core/src/logical_plan/builder.rs | 15 ++------------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 5ff51e07514eb..33c15c57b9603 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -297,6 +297,7 @@ mod test { use datafusion::execution::context::default_session_builder; use datafusion::logical_plan::{col, sum, LogicalPlan, LogicalPlanBuilder}; use datafusion::prelude::{SessionConfig, SessionContext}; + use datafusion::test_util::scan_empty; use crate::scheduler_server::event::QueryStageSchedulerEvent; use crate::scheduler_server::SchedulerServer; @@ -606,7 +607,7 @@ mod test { Field::new("gmv", DataType::UInt64, false), ]); - LogicalPlanBuilder::scan_empty(None, &schema, Some(vec![0, 1])) + scan_empty(None, &schema, Some(vec![0, 1])) .unwrap() .aggregate(vec![col("id")], vec![sum(col("gmv"))]) .unwrap() diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index cb9107e540327..e84313f82f66f 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -17,7 +17,7 @@ //! This module provides a builder for creating LogicalPlans -use crate::datasource::{empty::EmptyTable, TableProvider}; +use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_expr::ExprSchemable; use crate::logical_plan::plan::{ @@ -52,7 +52,7 @@ pub const UNNAMED_TABLE: &str = "?table?"; /// Builder for logical plans /// -/// ``` +/// ``` ignore /// # use datafusion::prelude::*; /// # use datafusion::logical_plan::LogicalPlanBuilder; /// # use datafusion::error::Result; @@ -188,17 +188,6 @@ impl LogicalPlanBuilder { Ok(Self::from(LogicalPlan::Values(Values { schema, values }))) } - /// Scan an empty data source, mainly used in tests - pub fn scan_empty( - name: Option<&str>, - table_schema: &Schema, - projection: Option>, - ) -> Result { - let table_schema = Arc::new(table_schema.clone()); - let provider = Arc::new(EmptyTable::new(table_schema)); - Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection) - } - /// Convert a table provider into a builder with a TableScan pub fn scan( table_name: impl Into, From 77807f08f560d666bd1dbd6b1820bf7c3d1f2913 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 May 2022 07:37:49 -0600 Subject: [PATCH 4/4] LogicalPlanBuilder now uses TableSource instead of TableProvider --- .../rust/core/src/serde/logical_plan/mod.rs | 8 ++++---- .../rust/scheduler/src/scheduler_server/mod.rs | 2 +- .../examples/custom_datasource.rs | 16 ++++++++++------ datafusion/core/src/execution/context.rs | 15 +++++++++------ datafusion/core/src/logical_plan/builder.rs | 16 ++++++++-------- datafusion/core/src/sql/planner.rs | 16 ++++++++++------ datafusion/core/src/test_util.rs | 8 ++++++-- datafusion/core/tests/parquet_pruning.rs | 17 +++++++++++------ datafusion/core/tests/sql/projection.rs | 9 +++++---- 9 files changed, 64 insertions(+), 43 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 070dad98a79ad..f088f2f1fbb7b 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -32,9 +32,9 @@ use datafusion::logical_plan::plan::{ Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window, }; use datafusion::logical_plan::{ - source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, - CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, - Offset, Repartition, TableScan, Values, + provider_as_source, source_as_provider, Column, CreateCatalog, CreateCatalogSchema, + CreateExternalTable, CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, + LogicalPlanBuilder, Offset, Repartition, TableScan, Values, }; use datafusion::prelude::SessionContext; @@ -252,7 +252,7 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanBuilder::scan_with_filters( &scan.table_name, - Arc::new(provider), + provider_as_source(Arc::new(provider)), projection, filters, )? diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 33c15c57b9603..d61f570cb4c22 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -295,7 +295,7 @@ mod test { use ballista_core::serde::BallistaCodec; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::execution::context::default_session_builder; - use datafusion::logical_plan::{col, sum, LogicalPlan, LogicalPlanBuilder}; + use datafusion::logical_plan::{col, sum, LogicalPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion::test_util::scan_empty; diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index a9a8ef7aa22ac..a814e585e4aa9 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -23,7 +23,7 @@ use datafusion::dataframe::DataFrame; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::TaskContext; -use datafusion::logical_plan::{Expr, LogicalPlanBuilder}; +use datafusion::logical_plan::{provider_as_source, Expr, LogicalPlanBuilder}; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ @@ -60,11 +60,15 @@ async fn search_accounts( let ctx = SessionContext::new(); // create logical plan composed of a single TableScan - let logical_plan = - LogicalPlanBuilder::scan_with_filters("accounts", Arc::new(db), None, vec![]) - .unwrap() - .build() - .unwrap(); + let logical_plan = LogicalPlanBuilder::scan_with_filters( + "accounts", + provider_as_source(Arc::new(db)), + None, + vec![], + ) + .unwrap() + .build() + .unwrap(); let mut dataframe = DataFrame::new(ctx.state, &logical_plan) .select_columns(&["id", "bank_account"])?; diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index ca3bca61d80a2..629adf137fe31 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -61,9 +61,9 @@ use crate::datasource::listing::ListingTableConfig; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_plan::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, - CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan, LogicalPlanBuilder, - UNNAMED_TABLE, + provider_as_source, CreateCatalog, CreateCatalogSchema, CreateExternalTable, + CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan, + LogicalPlanBuilder, UNNAMED_TABLE, }; use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; use crate::optimizer::filter_push_down::FilterPushDown; @@ -586,7 +586,9 @@ impl SessionContext { .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; - let plan = LogicalPlanBuilder::scan(path, Arc::new(provider), None)?.build()?; + let plan = + LogicalPlanBuilder::scan(path, provider_as_source(Arc::new(provider)), None)? + .build()?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } @@ -620,7 +622,8 @@ impl SessionContext { pub fn read_table(&self, provider: Arc) -> Result> { Ok(Arc::new(DataFrame::new( self.state.clone(), - &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?.build()?, + &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider), None)? + .build()?, ))) } @@ -817,7 +820,7 @@ impl SessionContext { Some(ref provider) => { let plan = LogicalPlanBuilder::scan( table_ref.table(), - Arc::clone(provider), + provider_as_source(Arc::clone(provider)), None, )? .build()?; diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index e84313f82f66f..24658e526a3a6 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -17,7 +17,6 @@ //! This module provides a builder for creating LogicalPlans -use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_expr::ExprSchemable; use crate::logical_plan::plan::{ @@ -41,11 +40,12 @@ use std::{ use super::{Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; use crate::logical_plan::{ columnize_expr, exprlist_to_fields, normalize_col, normalize_cols, - provider_as_source, rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, - DFSchemaRef, Limit, Offset, Partitioning, Repartition, Values, + rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, + Offset, Partitioning, Repartition, Values, }; use datafusion_common::ToDFSchema; +use datafusion_expr::TableSource; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -191,16 +191,16 @@ impl LogicalPlanBuilder { /// Convert a table provider into a builder with a TableScan pub fn scan( table_name: impl Into, - provider: Arc, + table_source: Arc, projection: Option>, ) -> Result { - Self::scan_with_filters(table_name, provider, projection, vec![]) + Self::scan_with_filters(table_name, table_source, projection, vec![]) } /// Convert a table provider into a builder with a TableScan pub fn scan_with_filters( table_name: impl Into, - provider: Arc, + table_source: Arc, projection: Option>, filters: Vec, ) -> Result { @@ -212,7 +212,7 @@ impl LogicalPlanBuilder { )); } - let schema = provider.schema(); + let schema = table_source.schema(); let projected_schema = projection .as_ref() @@ -232,7 +232,7 @@ impl LogicalPlanBuilder { let table_scan = LogicalPlan::TableScan(TableScan { table_name, - source: provider_as_source(provider), + source: table_source, projected_schema: Arc::new(projected_schema), projection, filters, diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 85c2d8f0cc150..f16ba048e1ba0 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -28,10 +28,11 @@ use crate::datasource::TableProvider; use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ - and, col, lit, normalize_col, normalize_col_with_schemas, union_with_alias, Column, - CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, - CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr, FileType, - LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan, + and, col, lit, normalize_col, normalize_col_with_schemas, provider_as_source, + union_with_alias, Column, CreateCatalog, CreateCatalogSchema, + CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView, + DFSchema, DFSchemaRef, DropTable, Expr, FileType, LogicalPlan, LogicalPlanBuilder, + Operator, PlanType, ToDFSchema, ToStringifiedPlan, }; use crate::prelude::JoinType; use crate::scalar::ScalarValue; @@ -714,8 +715,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { _ => Ok(cte_plan.clone()), }, (_, Ok(provider)) => { - let scan = - LogicalPlanBuilder::scan(&table_name, provider, None); + let scan = LogicalPlanBuilder::scan( + &table_name, + provider_as_source(provider), + None, + ); let scan = match table_alias.as_ref() { Some(ref name) => scan?.alias(name.to_owned().as_str()), _ => scan, diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 3e174e9dc67b7..1a6a5028e0cb0 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -21,7 +21,7 @@ use std::collections::BTreeMap; use std::{env, error::Error, path::PathBuf, sync::Arc}; use crate::datasource::empty::EmptyTable; -use crate::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE}; +use crate::logical_plan::{provider_as_source, LogicalPlanBuilder, UNNAMED_TABLE}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::DataFusionError; @@ -243,7 +243,11 @@ pub fn scan_empty( ) -> Result { let table_schema = Arc::new(table_schema.clone()); let provider = Arc::new(EmptyTable::new(table_schema)); - LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection) + LogicalPlanBuilder::scan( + name.unwrap_or(UNNAMED_TABLE), + provider_as_source(provider), + projection, + ) } /// Get the schema for the aggregate_test_* csv files diff --git a/datafusion/core/tests/parquet_pruning.rs b/datafusion/core/tests/parquet_pruning.rs index 0d580f2d27732..7e7caa959ffd4 100644 --- a/datafusion/core/tests/parquet_pruning.rs +++ b/datafusion/core/tests/parquet_pruning.rs @@ -30,6 +30,7 @@ use arrow::{ util::pretty::pretty_format_batches, }; use chrono::{Datelike, Duration}; +use datafusion::logical_plan::provider_as_source; use datafusion::{ datasource::TableProvider, logical_plan::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}, @@ -544,12 +545,16 @@ impl ContextWithParquet { /// the number of output rows and normalized execution metrics async fn query_with_expr(&mut self, expr: Expr) -> TestOutput { let sql = format!("EXPR only: {:?}", expr); - let logical_plan = LogicalPlanBuilder::scan("t", self.provider.clone(), None) - .unwrap() - .filter(expr) - .unwrap() - .build() - .unwrap(); + let logical_plan = LogicalPlanBuilder::scan( + "t", + provider_as_source(self.provider.clone()), + None, + ) + .unwrap() + .filter(expr) + .unwrap() + .build() + .unwrap(); self.run_test(logical_plan, sql).await } diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index e1b1742bfa2da..c74445bfde524 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE}; +use datafusion::logical_plan::{provider_as_source, LogicalPlanBuilder, UNNAMED_TABLE}; use datafusion::test_util::scan_empty; use tempfile::TempDir; @@ -239,9 +239,10 @@ async fn projection_on_memory_scan() -> Result<()> { )?]]; let provider = Arc::new(MemTable::try_new(schema, partitions)?); - let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)? - .project(vec![col("b")])? - .build()?; + let plan = + LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider), None)? + .project(vec![col("b")])? + .build()?; assert_fields_eq(&plan, vec!["b"]); let ctx = SessionContext::new();