diff --git a/datafusion/__init__.py b/datafusion/__init__.py index 6784eea5c..4667835e2 100644 --- a/datafusion/__init__.py +++ b/datafusion/__init__.py @@ -41,6 +41,7 @@ from .expr import ( Expr, + Projection, TableScan, ) @@ -58,6 +59,7 @@ "column", "literal", "TableScan", + "Projection", "DFSchema", ] diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index 66db59257..7bdbd8371 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -32,6 +32,7 @@ from datafusion.expr import ( Expr, + Projection, TableScan, ) @@ -53,12 +54,12 @@ def test_class_module_is_datafusion(): ]: assert klass.__module__ == "datafusion" + for klass in [Expr, Projection, TableScan]: + assert klass.__module__ == "datafusion.expr" + for klass in [DFSchema]: assert klass.__module__ == "datafusion.common" - for klass in [Expr, TableScan]: - assert klass.__module__ == "datafusion.expr" - def test_import_from_functions_submodule(): from datafusion.functions import abs, sin # noqa diff --git a/src/expr.rs b/src/expr.rs index dceedc1fc..f3695febf 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -24,6 +24,8 @@ use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField}; use datafusion::scalar::ScalarValue; +pub mod logical_node; +pub mod projection; pub mod table_scan; /// A PyExpr that can be used on a DataFrame @@ -140,5 +142,6 @@ impl PyExpr { pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/expr/logical_node.rs b/src/expr/logical_node.rs new file mode 100644 index 000000000..1bb3fa75f --- /dev/null +++ b/src/expr/logical_node.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::sql::logical::PyLogicalPlan; + +/// Representation of a `LogicalNode` in the in overall `LogicalPlan` +/// any "node" shares these common traits in common. +pub trait LogicalNode { + /// The input plan to the current logical node instance. + fn input(&self) -> Vec; +} diff --git a/src/expr/projection.rs b/src/expr/projection.rs new file mode 100644 index 000000000..6d04e59a8 --- /dev/null +++ b/src/expr/projection.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::DataFusionError; +use datafusion_expr::logical_plan::Projection; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::errors::py_runtime_err; +use crate::expr::logical_node::LogicalNode; +use crate::expr::PyExpr; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Projection", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyProjection { + projection: Projection, +} + +impl From for PyProjection { + fn from(projection: Projection) -> PyProjection { + PyProjection { projection } + } +} + +impl TryFrom for Projection { + type Error = DataFusionError; + + fn try_from(py_proj: PyProjection) -> Result { + Projection::try_new_with_schema( + py_proj.projection.expr, + py_proj.projection.input.clone(), + py_proj.projection.schema, + ) + } +} + +impl Display for PyProjection { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Projection + \nExpr(s): {:?} + \nInput: {:?} + \nProjected Schema: {:?}", + &self.projection.expr, &self.projection.input, &self.projection.schema, + ) + } +} + +#[pymethods] +impl PyProjection { + /// Retrieves the expressions for this `Projection` + #[pyo3(name = "projections")] + fn py_projections(&self) -> PyResult> { + Ok(self + .projection + .expr + .iter() + .map(|e| PyExpr::from(e.clone())) + .collect()) + } + + // Retrieves the input `LogicalPlan` to this `Projection` node + #[pyo3(name = "input")] + fn py_input(&self) -> PyResult { + // DataFusion make a loose guarantee that each Projection should have an input, however + // we check for that hear since we are performing explicit index retrieval + let inputs = LogicalNode::input(self); + if !inputs.is_empty() { + return Ok(inputs[0].clone()); + } + + Err(py_runtime_err(format!( + "Expected `input` field for Projection node: {}", + self + ))) + } + + // Resulting Schema for this `Projection` node instance + #[pyo3(name = "schema")] + fn py_schema(&self) -> PyResult { + Ok((*self.projection.schema).clone().into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Projection({})", self)) + } +} + +impl LogicalNode for PyProjection { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.projection.input).clone())] + } +} diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs index bc7d68af5..00504b97f 100644 --- a/src/expr/table_scan.rs +++ b/src/expr/table_scan.rs @@ -19,7 +19,7 @@ use datafusion_expr::logical_plan::TableScan; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; -use crate::expr::PyExpr; +use crate::{common::df_schema::PyDFSchema, expr::PyExpr}; #[pyclass(name = "TableScan", module = "datafusion.expr", subclass)] #[derive(Clone)] @@ -49,8 +49,8 @@ impl Display for PyTableScan { \nFilters: {:?}", &self.table_scan.table_name, &self.py_projections(), - self.table_scan.projected_schema, - self.py_filters(), + &self.py_schema(), + &self.py_filters(), ) } } @@ -89,13 +89,11 @@ impl PyTableScan { } } - /// TODO: Bindings for `DFSchema` need to exist first. Left as a - /// placeholder to display intention to add when able to. - // /// Resulting schema from the `TableScan` operation - // #[pyo3(name = "projectedSchema")] - // fn py_projected_schema(&self) -> PyResult { - // Ok(self.table_scan.projected_schema) - // } + /// Resulting schema from the `TableScan` operation + #[pyo3(name = "schema")] + fn py_schema(&self) -> PyResult { + Ok((*self.table_scan.projected_schema).clone().into()) + } /// Certain `TableProvider` physical readers offer the capability to filter rows that /// are read at read time. These `filters` are contained here.