diff --git a/datafusion/__init__.py b/datafusion/__init__.py index b6cd5178a..46206f062 100644 --- a/datafusion/__init__.py +++ b/datafusion/__init__.py @@ -41,8 +41,12 @@ ) from .expr import ( + Analyze, Expr, + Filter, + Limit, Projection, + Sort, TableScan, ) @@ -63,6 +67,10 @@ "Projection", "DFSchema", "DFField", + "Analyze", + "Sort", + "Limit", + "Filter", ] diff --git a/datafusion/pandas.py b/datafusion/pandas.py index 36e4ba2e0..f8e56512b 100644 --- a/datafusion/pandas.py +++ b/datafusion/pandas.py @@ -30,7 +30,6 @@ def register_parquet(self, name, path): self.datafusion_ctx.register_parquet(name, path) def to_pandas_expr(self, expr): - # get Python wrapper for logical expression expr = expr.to_variant() diff --git a/datafusion/polars.py b/datafusion/polars.py index e29e51156..a1bafbef8 100644 --- a/datafusion/polars.py +++ b/datafusion/polars.py @@ -31,7 +31,6 @@ def register_parquet(self, name, path): self.datafusion_ctx.register_parquet(name, path) def to_polars_expr(self, expr): - # get Python wrapper for logical expression expr = expr.to_variant() diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 6faffaf5b..efa2eded5 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -35,7 +35,6 @@ def test_create_context_no_args(): def test_create_context_with_all_valid_args(): - runtime = ( RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000) ) diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index 40b005b0d..7eb8b7cf7 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -43,6 +43,7 @@ Limit, Aggregate, Sort, + Analyze, ) @@ -68,7 +69,15 @@ def test_class_module_is_datafusion(): assert klass.__module__ == "datafusion.expr" # operators - for klass in [Projection, TableScan, Aggregate, Sort, Limit, Filter]: + for klass in [ + Projection, + TableScan, + Aggregate, + Sort, + Limit, + Filter, + Analyze, + ]: assert klass.__module__ == "datafusion.expr" # schema diff --git a/src/expr.rs b/src/expr.rs index ba01c9927..90ce6bf0e 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -31,6 +31,7 @@ use datafusion::scalar::ScalarValue; pub mod aggregate; pub mod aggregate_expr; +pub mod analyze; pub mod binary_expr; pub mod column; pub mod filter; @@ -183,5 +184,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/expr/analyze.rs b/src/expr/analyze.rs new file mode 100644 index 000000000..095fab037 --- /dev/null +++ b/src/expr/analyze.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::logical_plan::Analyze; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Analyze", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyAnalyze { + analyze: Analyze, +} + +impl PyAnalyze { + pub fn new(analyze: Analyze) -> Self { + Self { analyze } + } +} + +impl From for PyAnalyze { + fn from(analyze: Analyze) -> PyAnalyze { + PyAnalyze { analyze } + } +} + +impl From for Analyze { + fn from(analyze: PyAnalyze) -> Self { + analyze.analyze + } +} + +impl Display for PyAnalyze { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "Analyze Table") + } +} + +#[pymethods] +impl PyAnalyze { + fn verbose(&self) -> PyResult { + Ok(self.analyze.verbose) + } + + /// Resulting Schema for this `Analyze` node instance + fn schema(&self) -> PyResult { + Ok((*self.analyze.schema).clone().into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Analyze({})", self)) + } +} + +impl LogicalNode for PyAnalyze { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.analyze.input).clone())] + } +} diff --git a/src/sql/logical.rs b/src/sql/logical.rs index ce6b1fbfd..ee48f1e17 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use crate::errors::py_runtime_err; use crate::expr::aggregate::PyAggregate; +use crate::expr::analyze::PyAnalyze; use crate::expr::filter::PyFilter; use crate::expr::limit::PyLimit; use crate::expr::projection::PyProjection; @@ -40,6 +41,10 @@ impl PyLogicalPlan { plan: Arc::new(plan), } } + + pub fn plan(&self) -> Arc { + self.plan.clone() + } } #[pymethods] @@ -47,12 +52,13 @@ impl PyLogicalPlan { /// Return the specific logical operator fn to_variant(&self, py: Python) -> PyResult { Python::with_gil(|_| match self.plan.as_ref() { - LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), - LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)), + LogicalPlan::Analyze(plan) => Ok(PyAnalyze::from(plan.clone()).into_py(py)), + LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)), LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)), + LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), LogicalPlan::Sort(plan) => Ok(PySort::from(plan.clone()).into_py(py)), - LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)), + LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), other => Err(py_runtime_err(format!( "Cannot convert this plan to a LogicalNode: {:?}", other @@ -61,7 +67,7 @@ impl PyLogicalPlan { } /// Get the inputs to this plan - pub fn inputs(&self) -> Vec { + fn inputs(&self) -> Vec { let mut inputs = vec![]; for input in self.plan.inputs() { inputs.push(input.to_owned().into()); @@ -73,19 +79,19 @@ impl PyLogicalPlan { Ok(format!("{:?}", self.plan)) } - pub fn display(&self) -> String { + fn display(&self) -> String { format!("{}", self.plan.display()) } - pub fn display_indent(&self) -> String { + fn display_indent(&self) -> String { format!("{}", self.plan.display_indent()) } - pub fn display_indent_schema(&self) -> String { + fn display_indent_schema(&self) -> String { format!("{}", self.plan.display_indent_schema()) } - pub fn display_graphviz(&self) -> String { + fn display_graphviz(&self) -> String { format!("{}", self.plan.display_indent_schema()) } }