diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 070dad98a79ad..f088f2f1fbb7b 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -32,9 +32,9 @@ use datafusion::logical_plan::plan::{ Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window, }; use datafusion::logical_plan::{ - source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, - CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, - Offset, Repartition, TableScan, Values, + provider_as_source, source_as_provider, Column, CreateCatalog, CreateCatalogSchema, + CreateExternalTable, CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, + LogicalPlanBuilder, Offset, Repartition, TableScan, Values, }; use datafusion::prelude::SessionContext; @@ -252,7 +252,7 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanBuilder::scan_with_filters( &scan.table_name, - Arc::new(provider), + provider_as_source(Arc::new(provider)), projection, filters, )? diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index a9a8ef7aa22ac..a814e585e4aa9 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -23,7 +23,7 @@ use datafusion::dataframe::DataFrame; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::TaskContext; -use datafusion::logical_plan::{Expr, LogicalPlanBuilder}; +use datafusion::logical_plan::{provider_as_source, Expr, LogicalPlanBuilder}; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ @@ -60,11 +60,15 @@ async fn search_accounts( let ctx = SessionContext::new(); // create logical plan composed of a single TableScan - let logical_plan = - LogicalPlanBuilder::scan_with_filters("accounts", Arc::new(db), None, vec![]) - .unwrap() - .build() - .unwrap(); + let logical_plan = LogicalPlanBuilder::scan_with_filters( + "accounts", + provider_as_source(Arc::new(db)), + None, + vec![], + ) + .unwrap() + .build() + .unwrap(); let mut dataframe = DataFrame::new(ctx.state, &logical_plan) .select_columns(&["id", "bank_account"])?; diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index ca3bca61d80a2..629adf137fe31 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -61,9 +61,9 @@ use crate::datasource::listing::ListingTableConfig; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_plan::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, - CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan, LogicalPlanBuilder, - UNNAMED_TABLE, + provider_as_source, CreateCatalog, CreateCatalogSchema, CreateExternalTable, + CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan, + LogicalPlanBuilder, UNNAMED_TABLE, }; use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; use crate::optimizer::filter_push_down::FilterPushDown; @@ -586,7 +586,9 @@ impl SessionContext { .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; - let plan = LogicalPlanBuilder::scan(path, Arc::new(provider), None)?.build()?; + let plan = + LogicalPlanBuilder::scan(path, provider_as_source(Arc::new(provider)), None)? + .build()?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } @@ -620,7 +622,8 @@ impl SessionContext { pub fn read_table(&self, provider: Arc) -> Result> { Ok(Arc::new(DataFrame::new( self.state.clone(), - &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?.build()?, + &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider), None)? + .build()?, ))) } @@ -817,7 +820,7 @@ impl SessionContext { Some(ref provider) => { let plan = LogicalPlanBuilder::scan( table_ref.table(), - Arc::clone(provider), + provider_as_source(Arc::clone(provider)), None, )? .build()?; diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index f8b1fefdd08b0..cad1b0a6a7095 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -17,7 +17,6 @@ //! This module provides a builder for creating LogicalPlans -use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_expr::ExprSchemable; use crate::logical_plan::plan::{ @@ -41,11 +40,12 @@ use std::{ use super::{Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; use crate::logical_plan::{ columnize_expr, exprlist_to_fields, normalize_col, normalize_cols, - provider_as_source, rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, - DFSchemaRef, Limit, Offset, Partitioning, Repartition, Values, + rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, + Offset, Partitioning, Repartition, Values, }; use datafusion_common::ToDFSchema; +use datafusion_expr::TableSource; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -191,16 +191,16 @@ impl LogicalPlanBuilder { /// Convert a table provider into a builder with a TableScan pub fn scan( table_name: impl Into, - provider: Arc, + table_source: Arc, projection: Option>, ) -> Result { - Self::scan_with_filters(table_name, provider, projection, vec![]) + Self::scan_with_filters(table_name, table_source, projection, vec![]) } /// Convert a table provider into a builder with a TableScan pub fn scan_with_filters( table_name: impl Into, - provider: Arc, + table_source: Arc, projection: Option>, filters: Vec, ) -> Result { @@ -212,7 +212,7 @@ impl LogicalPlanBuilder { )); } - let schema = provider.schema(); + let schema = table_source.schema(); let projected_schema = projection .as_ref() @@ -232,7 +232,7 @@ impl LogicalPlanBuilder { let table_scan = LogicalPlan::TableScan(TableScan { table_name, - source: provider_as_source(provider), + source: table_source, projected_schema: Arc::new(projected_schema), projection, filters, diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index b4efc086829e8..d698619afc56d 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -28,10 +28,11 @@ use crate::datasource::TableProvider; use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ - and, col, lit, normalize_col, normalize_col_with_schemas, Column, CreateCatalog, - CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, - CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr, FileType, - LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan, + and, col, lit, normalize_col, normalize_col_with_schemas, provider_as_source, + union_with_alias, Column, CreateCatalog, CreateCatalogSchema, + CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView, + DFSchema, DFSchemaRef, DropTable, Expr, FileType, LogicalPlan, LogicalPlanBuilder, + Operator, PlanType, ToDFSchema, ToStringifiedPlan, }; use crate::prelude::JoinType; use crate::scalar::ScalarValue; @@ -713,8 +714,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { _ => Ok(cte_plan.clone()), }, (_, Ok(provider)) => { - let scan = - LogicalPlanBuilder::scan(&table_name, provider, None); + let scan = LogicalPlanBuilder::scan( + &table_name, + provider_as_source(provider), + None, + ); let scan = match table_alias.as_ref() { Some(ref name) => scan?.alias(name.to_owned().as_str()), _ => scan, diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 3e174e9dc67b7..1a6a5028e0cb0 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -21,7 +21,7 @@ use std::collections::BTreeMap; use std::{env, error::Error, path::PathBuf, sync::Arc}; use crate::datasource::empty::EmptyTable; -use crate::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE}; +use crate::logical_plan::{provider_as_source, LogicalPlanBuilder, UNNAMED_TABLE}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::DataFusionError; @@ -243,7 +243,11 @@ pub fn scan_empty( ) -> Result { let table_schema = Arc::new(table_schema.clone()); let provider = Arc::new(EmptyTable::new(table_schema)); - LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection) + LogicalPlanBuilder::scan( + name.unwrap_or(UNNAMED_TABLE), + provider_as_source(provider), + projection, + ) } /// Get the schema for the aggregate_test_* csv files diff --git a/datafusion/core/tests/parquet_pruning.rs b/datafusion/core/tests/parquet_pruning.rs index 0d580f2d27732..7e7caa959ffd4 100644 --- a/datafusion/core/tests/parquet_pruning.rs +++ b/datafusion/core/tests/parquet_pruning.rs @@ -30,6 +30,7 @@ use arrow::{ util::pretty::pretty_format_batches, }; use chrono::{Datelike, Duration}; +use datafusion::logical_plan::provider_as_source; use datafusion::{ datasource::TableProvider, logical_plan::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}, @@ -544,12 +545,16 @@ impl ContextWithParquet { /// the number of output rows and normalized execution metrics async fn query_with_expr(&mut self, expr: Expr) -> TestOutput { let sql = format!("EXPR only: {:?}", expr); - let logical_plan = LogicalPlanBuilder::scan("t", self.provider.clone(), None) - .unwrap() - .filter(expr) - .unwrap() - .build() - .unwrap(); + let logical_plan = LogicalPlanBuilder::scan( + "t", + provider_as_source(self.provider.clone()), + None, + ) + .unwrap() + .filter(expr) + .unwrap() + .build() + .unwrap(); self.run_test(logical_plan, sql).await } diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index e1b1742bfa2da..c74445bfde524 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE}; +use datafusion::logical_plan::{provider_as_source, LogicalPlanBuilder, UNNAMED_TABLE}; use datafusion::test_util::scan_empty; use tempfile::TempDir; @@ -239,9 +239,10 @@ async fn projection_on_memory_scan() -> Result<()> { )?]]; let provider = Arc::new(MemTable::try_new(schema, partitions)?); - let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)? - .project(vec![col("b")])? - .build()?; + let plan = + LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider), None)? + .project(vec![col("b")])? + .build()?; assert_fields_eq(&plan, vec!["b"]); let ctx = SessionContext::new();