From 5501713c9e60469ab86749dde9d06b05eb2b8cd4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Apr 2022 07:10:06 -0600 Subject: [PATCH 1/8] Introduce new TableSource trait in expr crate --- ballista/rust/client/src/context.rs | 10 +-- .../core/src/catalog/information_schema.rs | 3 +- datafusion/core/src/dataframe.rs | 2 +- datafusion/core/src/datasource/datasource.rs | 29 +------- .../core/src/datasource/listing/table.rs | 17 +++-- datafusion/core/src/datasource/mod.rs | 3 +- datafusion/core/src/logical_plan/builder.rs | 6 +- datafusion/core/src/logical_plan/mod.rs | 1 + datafusion/core/src/logical_plan/plan.rs | 64 ++++++++++++++++- .../core/src/optimizer/filter_push_down.rs | 13 ++-- .../src/optimizer/projection_push_down.rs | 6 +- datafusion/core/src/physical_plan/planner.rs | 5 +- .../core/tests/provider_filter_pushdown.rs | 4 +- datafusion/expr/src/lib.rs | 2 + datafusion/expr/src/table_source.rs | 72 +++++++++++++++++++ 15 files changed, 177 insertions(+), 60 deletions(-) create mode 100644 datafusion/expr/src/table_source.rs diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 7dc7ec63b9564..0d3b4522afa64 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -34,7 +34,9 @@ use datafusion::catalog::TableReference; use datafusion::dataframe::DataFrame; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; -use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan}; +use datafusion::logical_plan::{ + source_as_provider, CreateExternalTable, LogicalPlan, TableScan, +}; use datafusion::prelude::{ AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext, }; @@ -270,7 +272,7 @@ impl BallistaContext { ) -> Result<()> { match self.read_csv(path, options).await?.to_logical_plan() { LogicalPlan::TableScan(TableScan { source, .. }) => { - self.register_table(name, source) + self.register_table(name, source_as_provider(&source)?) } _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())), } @@ -284,7 +286,7 @@ impl BallistaContext { ) -> Result<()> { match self.read_parquet(path, options).await?.to_logical_plan() { LogicalPlan::TableScan(TableScan { source, .. }) => { - self.register_table(name, source) + self.register_table(name, source_as_provider(&source)?) } _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())), } @@ -298,7 +300,7 @@ impl BallistaContext { ) -> Result<()> { match self.read_avro(path, options).await?.to_logical_plan() { LogicalPlan::TableScan(TableScan { source, .. }) => { - self.register_table(name, source) + self.register_table(name, source_as_provider(&source)?) } _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())), } diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index 38306150a9da3..7df9aec95949e 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -31,7 +31,8 @@ use arrow::{ }; use datafusion_common::Result; -use crate::datasource::{MemTable, TableProvider, TableType}; +use crate::datasource::{MemTable, TableProvider}; +use crate::logical_expr::TableType; use super::{ catalog::{CatalogList, CatalogProvider}, diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index eac9ae29ada10..5a7017486b9ea 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -33,8 +33,8 @@ use crate::arrow::datatypes::Schema; use crate::arrow::datatypes::SchemaRef; use crate::arrow::util::pretty; use crate::datasource::TableProvider; -use crate::datasource::TableType; use crate::execution::context::{SessionState, TaskContext}; +use crate::logical_expr::TableType; use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::physical_plan::{collect, collect_partitioned}; use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs index 1b59c857fb074..f4fdc975dcfae 100644 --- a/datafusion/core/src/datasource/datasource.rs +++ b/datafusion/core/src/datasource/datasource.rs @@ -21,40 +21,13 @@ use std::any::Any; use std::sync::Arc; use async_trait::async_trait; +use datafusion_expr::{TableProviderFilterPushDown, TableType}; use crate::arrow::datatypes::SchemaRef; use crate::error::Result; use crate::logical_plan::Expr; use crate::physical_plan::ExecutionPlan; -/// Indicates whether and how a filter expression can be handled by a -/// TableProvider for table scans. -#[derive(Debug, Clone, PartialEq)] -pub enum TableProviderFilterPushDown { - /// The expression cannot be used by the provider. - Unsupported, - /// The expression can be used to help minimise the data retrieved, - /// but the provider cannot guarantee that all returned tuples - /// satisfy the filter. The Filter plan node containing this expression - /// will be preserved. - Inexact, - /// The provider guarantees that all returned data satisfies this - /// filter expression. The Filter plan node containing this expression - /// will be removed. - Exact, -} - -/// Indicates the type of this table for metadata/catalog purposes. -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum TableType { - /// An ordinary physical table. - Base, - /// A non-materialised table that itself uses a query internally to provide data. - View, - /// A transient table. - Temporary, -} - /// Source table #[async_trait] pub trait TableProvider: Sync + Send { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 18f12d03a494a..9e554c13d32ec 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -23,6 +23,14 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use async_trait::async_trait; use futures::StreamExt; +use crate::datasource::{ + file_format::{ + avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, + FileFormat, + }, + get_statistics_with_limit, TableProvider, +}; +use crate::logical_expr::TableProviderFilterPushDown; use crate::{ error::{DataFusionError, Result}, logical_plan::Expr, @@ -33,15 +41,6 @@ use crate::{ }, }; -use crate::datasource::{ - datasource::TableProviderFilterPushDown, - file_format::{ - avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, - FileFormat, - }, - get_statistics_with_limit, TableProvider, -}; - use super::PartitionedFile; use datafusion_data_access::object_store::ObjectStore; diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 2a801ff218fe2..f4d059e3dc608 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -27,11 +27,12 @@ pub mod object_store_registry; use futures::Stream; -pub use self::datasource::{TableProvider, TableType}; +pub use self::datasource::TableProvider; use self::listing::PartitionedFile; pub use self::memory::MemTable; use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; +pub use crate::logical_expr::TableType; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics}; use futures::StreamExt; diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index c88b25d0a2251..14889bb71b805 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -25,8 +25,8 @@ use crate::datasource::{ use crate::error::{DataFusionError, Result}; use crate::logical_plan::expr_schema::ExprSchemable; use crate::logical_plan::plan::{ - Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort, - SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window, + Aggregate, Analyze, DefaultTableSource, EmptyRelation, Explain, Filter, Join, + Projection, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window, }; use crate::optimizer::utils; use crate::prelude::*; @@ -449,7 +449,7 @@ impl LogicalPlanBuilder { let table_scan = LogicalPlan::TableScan(TableScan { table_name, - source: provider, + source: Arc::new(DefaultTableSource::new(provider)), projected_schema: Arc::new(projected_schema), projection, filters, diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index cc5023008f53d..33b3db061673a 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -62,6 +62,7 @@ pub use expr_simplier::{ExprSimplifiable, SimplifyInfo}; pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}; pub use extension::UserDefinedLogicalNode; pub use operators::Operator; +pub use plan::{provider_as_source, source_as_provider}; pub use plan::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 66307c6aba464..949fdd9d89de5 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -20,12 +20,14 @@ use super::display::{GraphvizVisitor, IndentVisitor}; use super::expr::{Column, Expr}; use super::extension::UserDefinedLogicalNode; -use crate::datasource::datasource::TableProviderFilterPushDown; use crate::datasource::TableProvider; use crate::error::DataFusionError; +use crate::logical_expr::TableProviderFilterPushDown; use crate::logical_plan::dfschema::DFSchemaRef; use crate::sql::parser::FileType; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_expr::TableSource; +use std::any::Any; use std::fmt::Formatter; use std::{ collections::HashSet, @@ -125,13 +127,71 @@ pub struct Window { pub schema: DFSchemaRef, } +/// DataFusion default table source, wrapping TableProvider +pub struct DefaultTableSource { + /// table provider + pub table_provider: Arc, +} + +impl DefaultTableSource { + /// Create a new DefaultTableSource to wrap a TableProvider + pub fn new(table_provider: Arc) -> Self { + Self { table_provider } + } +} + +impl TableSource for DefaultTableSource { + /// Returns the table source as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any { + self + } + + /// Get a reference to the schema for this table + fn schema(&self) -> SchemaRef { + self.table_provider.schema() + } + + /// Tests whether the table provider can make use of a filter expression + /// to optimise data retrieval. + fn supports_filter_pushdown( + &self, + filter: &Expr, + ) -> datafusion_common::Result { + self.table_provider.supports_filter_pushdown(filter) + } +} + +/// Wrap TableProvider in TableSource +pub fn provider_as_source( + table_provider: Arc, +) -> Arc { + Arc::new(DefaultTableSource::new(table_provider)) +} + +/// Extract TableProvider from TableSource +pub fn source_as_provider( + source: &Arc, +) -> datafusion_common::Result> { + match source + .as_ref() + .as_any() + .downcast_ref::() + { + Some(source) => Ok(source.table_provider.clone()), + _ => Err(DataFusionError::Internal( + "TableSource was not DefaultTableSource".to_string(), + )), + } +} + /// Produces rows from a table provider by reference or from the context #[derive(Clone)] pub struct TableScan { /// The name of the table pub table_name: String, /// The source of the table - pub source: Arc, + pub source: Arc, /// Optional column indices to use as a projection pub projection: Option>, /// The schema description of the output diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index 30a7ee97328e8..f4c0cb5c1829a 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -14,9 +14,11 @@ //! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan -use crate::datasource::datasource::TableProviderFilterPushDown; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union}; +use crate::logical_expr::TableProviderFilterPushDown; +use crate::logical_plan::plan::{ + provider_as_source, source_as_provider, Aggregate, Filter, Join, Projection, Union, +}; use crate::logical_plan::{ and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan, }; @@ -506,6 +508,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { }) => { let mut used_columns = HashSet::new(); let mut new_filters = filters.clone(); + let source = source_as_provider(source)?; for (filter_expr, cols) in &state.filters { let (preserve_filter_node, add_to_provider) = @@ -533,7 +536,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { state, used_columns, &LogicalPlan::TableScan(TableScan { - source: source.clone(), + source: provider_as_source(source), projection: projection.clone(), projected_schema: projected_schema.clone(), table_name: table_name.clone(), @@ -1417,7 +1420,7 @@ mod tests { (*test_provider.schema()).clone(), )?), projection: None, - source: Arc::new(test_provider), + source: provider_as_source(Arc::new(test_provider)), limit: None, }); @@ -1490,7 +1493,7 @@ mod tests { (*test_provider.schema()).clone(), )?), projection: Some(vec![0]), - source: Arc::new(test_provider), + source: provider_as_source(Arc::new(test_provider)), limit: None, }); diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 10bf5d10f9602..6cb818c9c0c59 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -21,7 +21,8 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ - Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window, + provider_as_source, source_as_provider, Aggregate, Analyze, Join, Projection, + SubqueryAlias, TableScan, Window, }; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, @@ -342,6 +343,7 @@ fn optimize_plan( limit, .. }) => { + let source = source_as_provider(source)?; let (projection, projected_schema) = get_projected_schema( Some(table_name), &source.schema(), @@ -351,7 +353,7 @@ fn optimize_plan( // return the table scan with projection Ok(LogicalPlan::TableScan(TableScan { table_name: table_name.clone(), - source: source.clone(), + source: provider_as_source(source.clone()), projection: Some(projection), projected_schema, filters: filters.clone(), diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 98076d1365bcd..5b34a65dc9f91 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -24,8 +24,8 @@ use super::{ }; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_plan::plan::{ - Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan, - Window, + source_as_provider, Aggregate, EmptyRelation, Filter, Join, Projection, Sort, + SubqueryAlias, TableScan, Window, }; use crate::logical_plan::{ unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, @@ -339,6 +339,7 @@ impl DefaultPhysicalPlanner { limit, .. }) => { + let source = source_as_provider(source)?; // Remove all qualifiers from the scan as the provider // doesn't know (nor should care) how the relation was // referred to in the query diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs index cfd90399149d5..664e77e182e20 100644 --- a/datafusion/core/tests/provider_filter_pushdown.rs +++ b/datafusion/core/tests/provider_filter_pushdown.rs @@ -19,10 +19,10 @@ use arrow::array::{as_primitive_array, Int32Builder, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; -use datafusion::datasource::datasource::{TableProvider, TableProviderFilterPushDown}; +use datafusion::datasource::datasource::TableProvider; use datafusion::error::Result; use datafusion::execution::context::{SessionContext, TaskContext}; -use datafusion::logical_plan::Expr; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index e6220e96c0117..81ed3d85c502b 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -30,6 +30,7 @@ mod literal; mod nullif; mod operator; mod signature; +mod table_source; pub mod type_coercion; mod udaf; mod udf; @@ -50,6 +51,7 @@ pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral}; pub use nullif::SUPPORTED_NULLIF_TYPES; pub use operator::Operator; pub use signature::{Signature, TypeSignature, Volatility}; +pub use table_source::{TableProviderFilterPushDown, TableSource, TableType}; pub use udaf::AggregateUDF; pub use udf::ScalarUDF; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs new file mode 100644 index 0000000000000..dbf1cd85fdbe5 --- /dev/null +++ b/datafusion/expr/src/table_source.rs @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Expr; +use arrow::datatypes::SchemaRef; +use std::any::Any; + +///! Table source + +/// Indicates whether and how a filter expression can be handled by a +/// TableProvider for table scans. +#[derive(Debug, Clone, PartialEq)] +pub enum TableProviderFilterPushDown { + /// The expression cannot be used by the provider. + Unsupported, + /// The expression can be used to help minimise the data retrieved, + /// but the provider cannot guarantee that all returned tuples + /// satisfy the filter. The Filter plan node containing this expression + /// will be preserved. + Inexact, + /// The provider guarantees that all returned data satisfies this + /// filter expression. The Filter plan node containing this expression + /// will be removed. + Exact, +} + +/// Indicates the type of this table for metadata/catalog purposes. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum TableType { + /// An ordinary physical table. + Base, + /// A non-materialised table that itself uses a query internally to provide data. + View, + /// A transient table. + Temporary, +} + +/// TableSource +pub trait TableSource: Sync + Send { + fn as_any(&self) -> &dyn Any; + + /// Get a reference to the schema for this table + fn schema(&self) -> SchemaRef; + + /// Get the type of this table for metadata/catalog purposes. + fn table_type(&self) -> TableType { + TableType::Base + } + + /// Tests whether the table provider can make use of a filter expression + /// to optimise data retrieval. + fn supports_filter_pushdown( + &self, + _filter: &Expr, + ) -> datafusion_common::Result { + Ok(TableProviderFilterPushDown::Unsupported) + } +} From b154ac3a5f44082916c1a4fed58bbbd2b1618f29 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Apr 2022 07:20:26 -0600 Subject: [PATCH 2/8] fix regressions --- ballista/rust/core/src/serde/logical_plan/mod.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index a0264271a5eea..ca94fa4477bac 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -32,9 +32,9 @@ use datafusion::logical_plan::plan::{ Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window, }; use datafusion::logical_plan::{ - Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, - JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, - Values, + source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, + CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, + TableScan, Values, }; use datafusion::prelude::SessionContext; @@ -510,6 +510,7 @@ impl AsLogicalPlan for LogicalPlanNode { projection, .. }) => { + let source = source_as_provider(source)?; let schema = source.schema(); let source = source.as_any(); @@ -982,6 +983,7 @@ mod roundtrip_tests { use crate::serde::{AsLogicalPlan, BallistaCodec}; use async_trait::async_trait; use core::panic; + use datafusion::logical_plan::source_as_provider; use datafusion::{ arrow::datatypes::{DataType, Field, Schema}, datafusion_data_access::{ @@ -1435,7 +1437,8 @@ mod roundtrip_tests { let round_trip_store = match round_trip { LogicalPlan::TableScan(scan) => { - match scan.source.as_ref().as_any().downcast_ref::() { + let source = source_as_provider(&scan.source)?; + match source.as_ref().as_any().downcast_ref::() { Some(listing_table) => { format!("{:?}", listing_table.object_store()) } From 148242d4db5a52d4707170dbfe7296512b995e64 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Apr 2022 09:46:45 -0600 Subject: [PATCH 3/8] code cleanup --- datafusion/core/src/logical_plan/builder.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 14889bb71b805..810edcb91d4a9 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -45,10 +45,7 @@ use std::{ use super::dfschema::ToDFSchema; use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; -use crate::logical_plan::{ - columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column, - CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, Values, -}; +use crate::logical_plan::{columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, Values, provider_as_source}; use crate::sql::utils::group_window_expr_by_sort_keys; /// Default table name for unnamed table @@ -449,7 +446,7 @@ impl LogicalPlanBuilder { let table_scan = LogicalPlan::TableScan(TableScan { table_name, - source: Arc::new(DefaultTableSource::new(provider)), + source: provider_as_source(provider), projected_schema: Arc::new(projected_schema), projection, filters, From 2495265fcb0c8f834d87a8ef5ab982e641638973 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Apr 2022 09:46:54 -0600 Subject: [PATCH 4/8] fmt --- datafusion/core/src/logical_plan/builder.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 810edcb91d4a9..a8fd4ac130f65 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -45,7 +45,11 @@ use std::{ use super::dfschema::ToDFSchema; use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; -use crate::logical_plan::{columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, Values, provider_as_source}; +use crate::logical_plan::{ + columnize_expr, normalize_col, normalize_cols, provider_as_source, + rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, + Partitioning, Repartition, Values, +}; use crate::sql::utils::group_window_expr_by_sort_keys; /// Default table name for unnamed table From ef1aaaa1259aba035e407b7ba92f1e7cc9aa738a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Apr 2022 10:24:19 -0600 Subject: [PATCH 5/8] remove redundant shim calls --- datafusion/core/src/logical_plan/builder.rs | 4 ++-- datafusion/core/src/optimizer/filter_push_down.rs | 13 +++++++------ .../core/src/optimizer/projection_push_down.rs | 6 ++---- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index a8fd4ac130f65..e417b0dc13a3c 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -25,8 +25,8 @@ use crate::datasource::{ use crate::error::{DataFusionError, Result}; use crate::logical_plan::expr_schema::ExprSchemable; use crate::logical_plan::plan::{ - Aggregate, Analyze, DefaultTableSource, EmptyRelation, Explain, Filter, Join, - Projection, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window, + Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort, + SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window, }; use crate::optimizer::utils; use crate::prelude::*; diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index f4c0cb5c1829a..19535de86b9f5 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -16,9 +16,7 @@ use crate::execution::context::ExecutionProps; use crate::logical_expr::TableProviderFilterPushDown; -use crate::logical_plan::plan::{ - provider_as_source, source_as_provider, Aggregate, Filter, Join, Projection, Union, -}; +use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union}; use crate::logical_plan::{ and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan, }; @@ -508,7 +506,6 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { }) => { let mut used_columns = HashSet::new(); let mut new_filters = filters.clone(); - let source = source_as_provider(source)?; for (filter_expr, cols) in &state.filters { let (preserve_filter_node, add_to_provider) = @@ -536,7 +533,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { state, used_columns, &LogicalPlan::TableScan(TableScan { - source: provider_as_source(source), + source: source.clone(), projection: projection.clone(), projected_schema: projected_schema.clone(), table_name: table_name.clone(), @@ -602,7 +599,11 @@ mod tests { }; use crate::physical_plan::ExecutionPlan; use crate::test::*; - use crate::{logical_plan::col, prelude::JoinType}; + use crate::{ + logical_plan::{col, plan::provider_as_source}, + prelude::JoinType, + }; + use arrow::datatypes::SchemaRef; use async_trait::async_trait; diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 6cb818c9c0c59..10bf5d10f9602 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -21,8 +21,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ - provider_as_source, source_as_provider, Aggregate, Analyze, Join, Projection, - SubqueryAlias, TableScan, Window, + Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window, }; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, @@ -343,7 +342,6 @@ fn optimize_plan( limit, .. }) => { - let source = source_as_provider(source)?; let (projection, projected_schema) = get_projected_schema( Some(table_name), &source.schema(), @@ -353,7 +351,7 @@ fn optimize_plan( // return the table scan with projection Ok(LogicalPlan::TableScan(TableScan { table_name: table_name.clone(), - source: provider_as_source(source.clone()), + source: source.clone(), projection: Some(projection), projected_schema, filters: filters.clone(), From b3b44db6e8d0d26410d097ede3ed774058ea2a62 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Apr 2022 15:25:07 -0600 Subject: [PATCH 6/8] Update datafusion/core/src/logical_plan/plan.rs Co-authored-by: Andrew Lamb --- datafusion/core/src/logical_plan/plan.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 949fdd9d89de5..fc3cb5c462429 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -128,6 +128,9 @@ pub struct Window { } /// DataFusion default table source, wrapping TableProvider +/// +/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource` +/// (logical plan trait) pub struct DefaultTableSource { /// table provider pub table_provider: Arc, From 635d33d6896f5bf7f1b3998f1383ce78c5e99c34 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Apr 2022 15:31:37 -0600 Subject: [PATCH 7/8] add documentation --- datafusion/core/src/logical_plan/plan.rs | 3 ++- datafusion/expr/src/table_source.rs | 9 ++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 949fdd9d89de5..92598b5532f68 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -169,7 +169,8 @@ pub fn provider_as_source( Arc::new(DefaultTableSource::new(table_provider)) } -/// Extract TableProvider from TableSource +/// Attempt to downcast a TableSource to DefaultTableSource and access the +/// TableProvider. This will only work with a TableSource created by DataFusion. pub fn source_as_provider( source: &Arc, ) -> datafusion_common::Result> { diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index dbf1cd85fdbe5..8e441e4846f43 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -49,7 +49,14 @@ pub enum TableType { Temporary, } -/// TableSource +/// The TableSource trait is used during logical query planning and optimizations and +/// provides access to schema information and filter push-down capabilities. This trait +/// provides a subset of the functionality of the TableProvider trait in the core +/// datafusion crate. The TableProvider trait provides additional capabilities needed for +/// physical query execution (such as the ability to perform a scan). The reason for +/// having two separate traits is to avoid having the logical plan code be dependent +/// on the DataFusion execution engine. Other projects may want to use DataFusion's +/// logical plans and have their own execution engine. pub trait TableSource: Sync + Send { fn as_any(&self) -> &dyn Any; From 5aea3ad640552618644f22fe9b0c61433d81691e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Apr 2022 15:47:07 -0600 Subject: [PATCH 8/8] fmt --- datafusion/core/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index eaade499795a9..a26d473580bef 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -128,7 +128,7 @@ pub struct Window { /// DataFusion default table source, wrapping TableProvider /// -/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource` +/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource` /// (logical plan trait) pub struct DefaultTableSource { /// table provider