From bcdf583d7aaf50819f1607d68bde53e870110cdc Mon Sep 17 00:00:00 2001 From: logan-keede Date: Wed, 19 Mar 2025 23:28:08 +0530 Subject: [PATCH 1/7] First Iteration --- .../src/datasource => catalog/src}/cte_worktable.rs | 13 ++++++------- .../src}/default_table_source.rs | 2 +- datafusion/catalog/src/lib.rs | 2 ++ datafusion/core/src/datasource/mod.rs | 13 ++++++------- 4 files changed, 15 insertions(+), 15 deletions(-) rename datafusion/{core/src/datasource => catalog/src}/cte_worktable.rs (93%) rename datafusion/{core/src/datasource => catalog/src}/default_table_source.rs (99%) diff --git a/datafusion/core/src/datasource/cte_worktable.rs b/datafusion/catalog/src/cte_worktable.rs similarity index 93% rename from datafusion/core/src/datasource/cte_worktable.rs rename to datafusion/catalog/src/cte_worktable.rs index b63755f644a84..d72a30909c02c 100644 --- a/datafusion/core/src/datasource/cte_worktable.rs +++ b/datafusion/catalog/src/cte_worktable.rs @@ -20,18 +20,17 @@ use std::sync::Arc; use std::{any::Any, borrow::Cow}; +use crate::Session; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use datafusion_catalog::Session; use datafusion_physical_plan::work_table::WorkTableExec; -use crate::{ - error::Result, - logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown}, - physical_plan::ExecutionPlan, -}; +use datafusion_physical_plan::ExecutionPlan; -use crate::datasource::{TableProvider, TableType}; +use datafusion_common::error::Result; +use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableType}; + +use crate::TableProvider; /// The temporary working table where the previous iteration of a recursive query is stored /// Naming is based on PostgreSQL's implementation. diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/catalog/src/default_table_source.rs similarity index 99% rename from datafusion/core/src/datasource/default_table_source.rs rename to datafusion/catalog/src/default_table_source.rs index 541e0b6dfa91c..6d84702feb363 100644 --- a/datafusion/core/src/datasource/default_table_source.rs +++ b/datafusion/catalog/src/default_table_source.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::{any::Any, borrow::Cow}; -use crate::datasource::TableProvider; +use crate::TableProvider; use arrow::datatypes::SchemaRef; use datafusion_common::{internal_err, Constraints}; diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs index 7ba97fbc9faab..a1c0a6185da40 100644 --- a/datafusion/catalog/src/lib.rs +++ b/datafusion/catalog/src/lib.rs @@ -49,6 +49,8 @@ pub use r#async::*; pub use schema::*; pub use session::*; pub use table::*; +pub mod cte_worktable; +pub mod default_table_source; pub mod stream; pub mod streaming; pub mod view; diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 80783b4892c7e..a0d68e1fb69a1 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -19,8 +19,6 @@ //! //! [`ListingTable`]: crate::datasource::listing::ListingTable -pub mod cte_worktable; -pub mod default_table_source; pub mod dynamic_file; pub mod empty; pub mod file_format; @@ -32,11 +30,6 @@ pub mod provider; mod statistics; mod view_test; -pub use datafusion_catalog::stream; -pub use datafusion_catalog::view; -pub use datafusion_datasource::schema_adapter; -pub use datafusion_datasource::source; - // backwards compatibility pub use self::default_table_source::{ provider_as_source, source_as_provider, DefaultTableSource, @@ -45,6 +38,12 @@ pub use self::memory::MemTable; pub use self::view::ViewTable; pub use crate::catalog::TableProvider; pub use crate::logical_expr::TableType; +pub use datafusion_catalog::cte_worktable; +pub use datafusion_catalog::default_table_source; +pub use datafusion_catalog::stream; +pub use datafusion_catalog::view; +pub use datafusion_datasource::schema_adapter; +pub use datafusion_datasource::source; pub use datafusion_execution::object_store; pub use datafusion_physical_expr::create_ordering; pub use statistics::get_statistics_with_limit; From 3014f98effa9c4a0f16591ad4345b723904f2024 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Wed, 19 Mar 2025 23:49:16 +0530 Subject: [PATCH 2/7] fix: CI tests --- datafusion/catalog/src/default_table_source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/catalog/src/default_table_source.rs b/datafusion/catalog/src/default_table_source.rs index 6d84702feb363..9db8242caa999 100644 --- a/datafusion/catalog/src/default_table_source.rs +++ b/datafusion/catalog/src/default_table_source.rs @@ -133,7 +133,7 @@ fn preserves_table_type() { async fn scan( &self, - _: &dyn datafusion_catalog::Session, + _: &dyn crate::Session, _: Option<&Vec>, _: &[Expr], _: Option, From 27a42cd3873583c5524c9d6cb614da3024b7812d Mon Sep 17 00:00:00 2001 From: logan-keede Date: Thu, 20 Mar 2025 16:47:10 +0530 Subject: [PATCH 3/7] stable waypoint, documentation pending, memory moce pending --- .../src/{memory.rs => memory/catalog.rs} | 69 +------------- datafusion/catalog/src/memory/mod.rs | 22 +++++ datafusion/catalog/src/memory/schema.rs | 90 +++++++++++++++++++ datafusion/catalog/src/session.rs | 3 + datafusion/core/src/datasource/memory.rs | 3 +- .../core/src/execution/session_state.rs | 10 ++- 6 files changed, 125 insertions(+), 72 deletions(-) rename datafusion/catalog/src/{memory.rs => memory/catalog.rs} (70%) create mode 100644 datafusion/catalog/src/memory/mod.rs create mode 100644 datafusion/catalog/src/memory/schema.rs diff --git a/datafusion/catalog/src/memory.rs b/datafusion/catalog/src/memory/catalog.rs similarity index 70% rename from datafusion/catalog/src/memory.rs rename to datafusion/catalog/src/memory/catalog.rs index d22a98d3d0644..b71888c54e9d6 100644 --- a/datafusion/catalog/src/memory.rs +++ b/datafusion/catalog/src/memory/catalog.rs @@ -18,10 +18,9 @@ //! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory //! implementations of [`CatalogProviderList`] and [`CatalogProvider`]. -use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; -use async_trait::async_trait; +use crate::{CatalogProvider, CatalogProviderList, SchemaProvider}; use dashmap::DashMap; -use datafusion_common::{exec_err, DataFusionError}; +use datafusion_common::exec_err; use std::any::Any; use std::sync::Arc; @@ -134,67 +133,3 @@ impl CatalogProvider for MemoryCatalogProvider { } } } - -/// Simple in-memory implementation of a schema. -#[derive(Debug)] -pub struct MemorySchemaProvider { - tables: DashMap>, -} - -impl MemorySchemaProvider { - /// Instantiates a new MemorySchemaProvider with an empty collection of tables. - pub fn new() -> Self { - Self { - tables: DashMap::new(), - } - } -} - -impl Default for MemorySchemaProvider { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl SchemaProvider for MemorySchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Vec { - self.tables - .iter() - .map(|table| table.key().clone()) - .collect() - } - - async fn table( - &self, - name: &str, - ) -> datafusion_common::Result>, DataFusionError> { - Ok(self.tables.get(name).map(|table| Arc::clone(table.value()))) - } - - fn register_table( - &self, - name: String, - table: Arc, - ) -> datafusion_common::Result>> { - if self.table_exist(name.as_str()) { - return exec_err!("The table {name} already exists"); - } - Ok(self.tables.insert(name, table)) - } - - fn deregister_table( - &self, - name: &str, - ) -> datafusion_common::Result>> { - Ok(self.tables.remove(name).map(|(_, table)| table)) - } - - fn table_exist(&self, name: &str) -> bool { - self.tables.contains_key(name) - } -} diff --git a/datafusion/catalog/src/memory/mod.rs b/datafusion/catalog/src/memory/mod.rs new file mode 100644 index 0000000000000..4848e9becab5e --- /dev/null +++ b/datafusion/catalog/src/memory/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod catalog; +pub(crate) mod schema; + +pub use catalog::*; +pub use schema::*; \ No newline at end of file diff --git a/datafusion/catalog/src/memory/schema.rs b/datafusion/catalog/src/memory/schema.rs new file mode 100644 index 0000000000000..4afb28f3098e0 --- /dev/null +++ b/datafusion/catalog/src/memory/schema.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory +//! implementations of [`CatalogProviderList`] and [`CatalogProvider`]. + +use crate::{SchemaProvider, TableProvider}; +use async_trait::async_trait; +use dashmap::DashMap; +use datafusion_common::{exec_err, DataFusionError}; +use std::any::Any; +use std::sync::Arc; + +/// Simple in-memory implementation of a schema. +#[derive(Debug)] +pub struct MemorySchemaProvider { + tables: DashMap>, +} + +impl MemorySchemaProvider { + /// Instantiates a new MemorySchemaProvider with an empty collection of tables. + pub fn new() -> Self { + Self { + tables: DashMap::new(), + } + } +} + +impl Default for MemorySchemaProvider { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl SchemaProvider for MemorySchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables + .iter() + .map(|table| table.key().clone()) + .collect() + } + + async fn table( + &self, + name: &str, + ) -> datafusion_common::Result>, DataFusionError> { + Ok(self.tables.get(name).map(|table| Arc::clone(table.value()))) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> datafusion_common::Result>> { + if self.table_exist(name.as_str()) { + return exec_err!("The table {name} already exists"); + } + Ok(self.tables.insert(name, table)) + } + + fn deregister_table( + &self, + name: &str, + ) -> datafusion_common::Result>> { + Ok(self.tables.remove(name).map(|(_, table)| table)) + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.contains_key(name) + } +} diff --git a/datafusion/catalog/src/session.rs b/datafusion/catalog/src/session.rs index 27177e2393c86..88b9669cff6d0 100644 --- a/datafusion/catalog/src/session.rs +++ b/datafusion/catalog/src/session.rs @@ -132,6 +132,9 @@ pub trait Session: Send + Sync { /// Returns a mutable reference to [`TableOptions`] fn table_options_mut(&mut self) -> &mut TableOptions; + + /// Get a new TaskContext to run in this session + fn task_ctx(&self) -> Arc; } /// Create a new task context instance from Session diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index d96944fa7a694..8fb0e4e8e584e 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -24,7 +24,6 @@ use std::sync::Arc; use crate::datasource::{TableProvider, TableType}; use crate::error::Result; -use crate::execution::context::SessionState; use crate::logical_expr::Expr; use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::repartition::RepartitionExec; @@ -129,7 +128,7 @@ impl MemTable { pub async fn load( t: Arc, output_partitions: Option, - state: &SessionState, + state: &dyn Session, ) -> Result { let schema = t.schema(); let constraints = t.constraints(); diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index d640f8e37a292..d8aac6de0f2b3 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -266,6 +266,10 @@ impl Session for SessionState { fn table_options_mut(&mut self) -> &mut TableOptions { self.table_options_mut() } + + fn task_ctx(&self) -> Arc { + Arc::new(TaskContext::from(self)) + } } impl SessionState { @@ -776,9 +780,9 @@ impl SessionState { } /// Get a new TaskContext to run in this session - pub fn task_ctx(&self) -> Arc { - Arc::new(TaskContext::from(self)) - } + // pub fn task_ctx(&self) -> Arc { + // Arc::new(TaskContext::from(self)) + // } /// Return catalog list pub fn catalog_list(&self) -> &Arc { From 441031648f8711f0c923e0afdaccd2169f875eef Mon Sep 17 00:00:00 2001 From: logan-keede Date: Thu, 20 Mar 2025 17:13:31 +0530 Subject: [PATCH 4/7] stable waypoint 2: add document pending, get a heads up pending --- datafusion/core/src/physical_planner.rs | 39 +++---------------- datafusion/physical-expr/src/lib.rs | 2 +- datafusion/physical-expr/src/physical_expr.rs | 36 ++++++++++++++++- 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 135b32a0a8d7a..249be982a932a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -39,7 +39,6 @@ use crate::physical_expr::{create_physical_expr, create_physical_exprs}; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; -use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::utils as join_utils; use crate::physical_plan::joins::{ @@ -78,7 +77,7 @@ use datafusion_expr::expr_rewriter::unnormalize_cols; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{ Analyze, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType, - Filter, JoinType, RecursiveQuery, SkipType, SortExpr, StringifiedPlan, WindowFrame, + Filter, JoinType, RecursiveQuery, SkipType, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; @@ -1679,37 +1678,11 @@ pub fn create_aggregate_expr_and_maybe_filter( ) } -/// Create a physical sort expression from a logical expression -pub fn create_physical_sort_expr( - e: &SortExpr, - input_dfschema: &DFSchema, - execution_props: &ExecutionProps, -) -> Result { - let SortExpr { - expr, - asc, - nulls_first, - } = e; - Ok(PhysicalSortExpr { - expr: create_physical_expr(expr, input_dfschema, execution_props)?, - options: SortOptions { - descending: !asc, - nulls_first: *nulls_first, - }, - }) -} - -/// Create vector of physical sort expression from a vector of logical expression -pub fn create_physical_sort_exprs( - exprs: &[SortExpr], - input_dfschema: &DFSchema, - execution_props: &ExecutionProps, -) -> Result { - exprs - .iter() - .map(|expr| create_physical_sort_expr(expr, input_dfschema, execution_props)) - .collect::>() -} +#[deprecated( + since = "47.0.0", + note = "use datafusion::{create_physical_sort_expr, create_physical_sort_exprs}" +)] +pub use datafusion_physical_expr::{create_physical_sort_expr, create_physical_sort_exprs}; impl DefaultPhysicalPlanner { /// Handles capturing the various plans for EXPLAIN queries diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 9abaeae4408ea..4a895f3546fe1 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -55,7 +55,7 @@ pub use equivalence::{ pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ create_ordering, physical_exprs_bag_equal, physical_exprs_contains, - physical_exprs_equal, PhysicalExprRef, + physical_exprs_equal, PhysicalExprRef,create_physical_sort_expr, create_physical_sort_exprs }; pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 2221bc980f6ce..80119274a67e3 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -17,10 +17,12 @@ use std::sync::Arc; -use datafusion_common::HashMap; +use datafusion_common::{DFSchema, HashMap}; +use datafusion_expr::execution_props::ExecutionProps; pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr; pub use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; use itertools::izip; +use crate::create_physical_expr; /// This function is similar to the `contains` method of `Vec`. It finds /// whether `expr` is among `physical_exprs`. @@ -146,6 +148,38 @@ pub fn create_ordering( Ok(all_sort_orders) } +/// Create a physical sort expression from a logical expression +pub fn create_physical_sort_expr( + e: &SortExpr, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result { + let SortExpr { + expr, + asc, + nulls_first, + } = e; + Ok(PhysicalSortExpr { + expr: create_physical_expr(expr, input_dfschema, execution_props)?, + options: SortOptions { + descending: !asc, + nulls_first: *nulls_first, + }, + }) +} + +/// Create vector of physical sort expression from a vector of logical expression +pub fn create_physical_sort_exprs( + exprs: &[SortExpr], + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result { + exprs + .iter() + .map(|expr| create_physical_sort_expr(expr, input_dfschema, execution_props)) + .collect::>() +} + #[cfg(test)] mod tests { use super::*; From 81184441bc4cdd9d42f5fc3bb71210558754accc Mon Sep 17 00:00:00 2001 From: logan-keede Date: Thu, 20 Mar 2025 17:39:55 +0530 Subject: [PATCH 5/7] pushing for test and review --- datafusion/catalog/src/memory/mod.rs | 2 +- datafusion/catalog/src/memory/schema.rs | 3 +-- datafusion/core/src/datasource/memory.rs | 2 ++ datafusion/core/src/execution/mod.rs | 2 -- datafusion/core/src/execution/session_state.rs | 6 +++++- datafusion/core/src/physical_planner.rs | 4 +++- datafusion/physical-expr/src/lib.rs | 5 +++-- datafusion/physical-expr/src/physical_expr.rs | 2 +- 8 files changed, 16 insertions(+), 10 deletions(-) diff --git a/datafusion/catalog/src/memory/mod.rs b/datafusion/catalog/src/memory/mod.rs index 4848e9becab5e..4c5cf1a9ae9de 100644 --- a/datafusion/catalog/src/memory/mod.rs +++ b/datafusion/catalog/src/memory/mod.rs @@ -19,4 +19,4 @@ pub(crate) mod catalog; pub(crate) mod schema; pub use catalog::*; -pub use schema::*; \ No newline at end of file +pub use schema::*; diff --git a/datafusion/catalog/src/memory/schema.rs b/datafusion/catalog/src/memory/schema.rs index 4afb28f3098e0..f1b3628f7affc 100644 --- a/datafusion/catalog/src/memory/schema.rs +++ b/datafusion/catalog/src/memory/schema.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory -//! implementations of [`CatalogProviderList`] and [`CatalogProvider`]. +//! [`MemorySchemaProvider`]: In-memory implementations of [`SchemaProvider`]. use crate::{SchemaProvider, TableProvider}; use async_trait::async_trait; diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 8fb0e4e8e584e..b2b5a57e0316d 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -266,6 +266,8 @@ impl TableProvider for MemTable { /// # Returns /// /// * A plan that returns the number of rows written. + /// + /// [`SessionState`]: crate::execution::context::SessionState async fn insert_into( &self, _state: &dyn Session, diff --git a/datafusion/core/src/execution/mod.rs b/datafusion/core/src/execution/mod.rs index 10aa16ffe47a5..2e3e09685bcc7 100644 --- a/datafusion/core/src/execution/mod.rs +++ b/datafusion/core/src/execution/mod.rs @@ -27,6 +27,4 @@ pub use session_state_defaults::SessionStateDefaults; // backwards compatibility pub use crate::datasource::file_format::options; - -// backwards compatibility pub use datafusion_execution::*; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index d8aac6de0f2b3..51fd6dfe7186a 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -268,7 +268,7 @@ impl Session for SessionState { } fn task_ctx(&self) -> Arc { - Arc::new(TaskContext::from(self)) + self.task_ctx() } } @@ -747,6 +747,10 @@ impl SessionState { &mut self.table_options } + /// Get a new TaskContext to run in this session + pub fn task_ctx(&self) -> Arc { + Arc::new(TaskContext::from(self)) + } /// Registers a [`ConfigExtension`] as a table option extension that can be /// referenced from SQL statements executed against this context. pub fn register_table_options_extension(&mut self, extension: T) { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 249be982a932a..75adbc1d7349e 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1682,7 +1682,9 @@ pub fn create_aggregate_expr_and_maybe_filter( since = "47.0.0", note = "use datafusion::{create_physical_sort_expr, create_physical_sort_exprs}" )] -pub use datafusion_physical_expr::{create_physical_sort_expr, create_physical_sort_exprs}; +pub use datafusion_physical_expr::{ + create_physical_sort_expr, create_physical_sort_exprs, +}; impl DefaultPhysicalPlanner { /// Handles capturing the various plans for EXPLAIN queries diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 4a895f3546fe1..93ced2eb628d8 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -54,8 +54,9 @@ pub use equivalence::{ }; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ - create_ordering, physical_exprs_bag_equal, physical_exprs_contains, - physical_exprs_equal, PhysicalExprRef,create_physical_sort_expr, create_physical_sort_exprs + create_ordering, create_physical_sort_expr, create_physical_sort_exprs, + physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, + PhysicalExprRef, }; pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 80119274a67e3..63c4ccbb4b385 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -17,12 +17,12 @@ use std::sync::Arc; +use crate::create_physical_expr; use datafusion_common::{DFSchema, HashMap}; use datafusion_expr::execution_props::ExecutionProps; pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr; pub use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; use itertools::izip; -use crate::create_physical_expr; /// This function is similar to the `contains` method of `Vec`. It finds /// whether `expr` is among `physical_exprs`. From b52a75e790b9bf13734ae4728f78f3f5a2034b30 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Thu, 20 Mar 2025 19:13:15 +0530 Subject: [PATCH 6/7] fix:mock in test --- datafusion/catalog-listing/src/helpers.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 9ac8423042d38..7742f5f9a1532 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -1079,5 +1079,9 @@ mod tests { fn table_options_mut(&mut self) -> &mut TableOptions { unimplemented!() } + + fn task_ctx(&self) -> Arc { + unimplemented!() + } } } From ccf5b715937af0ec44e61417a5d9fa20e0148c72 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Thu, 20 Mar 2025 21:04:26 +0530 Subject: [PATCH 7/7] fix:cliipy --- datafusion/core/src/execution/session_state.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 51fd6dfe7186a..515163102c417 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -747,10 +747,6 @@ impl SessionState { &mut self.table_options } - /// Get a new TaskContext to run in this session - pub fn task_ctx(&self) -> Arc { - Arc::new(TaskContext::from(self)) - } /// Registers a [`ConfigExtension`] as a table option extension that can be /// referenced from SQL statements executed against this context. pub fn register_table_options_extension(&mut self, extension: T) { @@ -784,9 +780,9 @@ impl SessionState { } /// Get a new TaskContext to run in this session - // pub fn task_ctx(&self) -> Arc { - // Arc::new(TaskContext::from(self)) - // } + pub fn task_ctx(&self) -> Arc { + Arc::new(TaskContext::from(self)) + } /// Return catalog list pub fn catalog_list(&self) -> &Arc {