From 6c3cb9792d04a6d82f8ba99b2279f5d6a84b51da Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 Feb 2023 07:57:59 -0700 Subject: [PATCH 01/13] changelog (#188) --- CHANGELOG.md | 48 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28c9b3334..8d47fdf5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,18 +19,34 @@ # Changelog -## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-01-19) +## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-02-17) [Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.8.0) **Implemented enhancements:** +- Add bindings for datafusion\_common::DFField [\#184](https://github.com/apache/arrow-datafusion-python/issues/184) +- Add bindings for DFSchema/DFSchemaRef [\#181](https://github.com/apache/arrow-datafusion-python/issues/181) +- Add bindings for datafusion\_expr Projection [\#179](https://github.com/apache/arrow-datafusion-python/issues/179) +- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` [\#177](https://github.com/apache/arrow-datafusion-python/issues/177) +- Add a "mapping" struct for types [\#172](https://github.com/apache/arrow-datafusion-python/issues/172) +- Improve string representation of datafusion classes \(dataframe, context, expression, ...\) [\#158](https://github.com/apache/arrow-datafusion-python/issues/158) +- Add DataFrame count method [\#151](https://github.com/apache/arrow-datafusion-python/issues/151) +- \[REQUEST\] Github Actions Improvements [\#146](https://github.com/apache/arrow-datafusion-python/issues/146) +- Change default branch name from master to main [\#144](https://github.com/apache/arrow-datafusion-python/issues/144) +- Bump pyo3 to 0.18.0 [\#140](https://github.com/apache/arrow-datafusion-python/issues/140) - Add script for Python linting [\#134](https://github.com/apache/arrow-datafusion-python/issues/134) +- Add Python bindings for substrait module [\#132](https://github.com/apache/arrow-datafusion-python/issues/132) +- Expand unit tests for built-in functions [\#128](https://github.com/apache/arrow-datafusion-python/issues/128) - support creating arrow-datafusion-python conda environment [\#122](https://github.com/apache/arrow-datafusion-python/issues/122) - Build Python source distribution in GitHub workflow [\#81](https://github.com/apache/arrow-datafusion-python/issues/81) +- EPIC: Add all functions to python binding `functions` [\#72](https://github.com/apache/arrow-datafusion-python/issues/72) **Fixed bugs:** +- Build is broken [\#161](https://github.com/apache/arrow-datafusion-python/issues/161) +- Out of memory when sorting [\#157](https://github.com/apache/arrow-datafusion-python/issues/157) +- window\_lead test appears to be non-deterministic [\#135](https://github.com/apache/arrow-datafusion-python/issues/135) - Reading csv does not work [\#130](https://github.com/apache/arrow-datafusion-python/issues/130) - Github actions produce a lot of warnings [\#94](https://github.com/apache/arrow-datafusion-python/issues/94) - ASF source release tarball has wrong directory name [\#90](https://github.com/apache/arrow-datafusion-python/issues/90) @@ -42,10 +58,40 @@ **Closed issues:** - Publish documentation for Python bindings [\#39](https://github.com/apache/arrow-datafusion-python/issues/39) +- Add Python binding for `approx_median` [\#32](https://github.com/apache/arrow-datafusion-python/issues/32) +- Release version 0.7.0 [\#7](https://github.com/apache/arrow-datafusion-python/issues/7) **Merged pull requests:** +- Add ability to execute ExecutionPlan and get a stream of RecordBatch [\#186](https://github.com/apache/arrow-datafusion-python/pull/186) ([andygrove](https://github.com/andygrove)) +- Dffield bindings [\#185](https://github.com/apache/arrow-datafusion-python/pull/185) ([jdye64](https://github.com/jdye64)) +- Add bindings for DFSchema [\#183](https://github.com/apache/arrow-datafusion-python/pull/183) ([jdye64](https://github.com/jdye64)) +- test: Window functions [\#182](https://github.com/apache/arrow-datafusion-python/pull/182) ([simicd](https://github.com/simicd)) +- Add bindings for Projection [\#180](https://github.com/apache/arrow-datafusion-python/pull/180) ([jdye64](https://github.com/jdye64)) +- Table scan bindings [\#178](https://github.com/apache/arrow-datafusion-python/pull/178) ([jdye64](https://github.com/jdye64)) +- Make session configurable [\#176](https://github.com/apache/arrow-datafusion-python/pull/176) ([andygrove](https://github.com/andygrove)) +- Upgrade to DataFusion 18.0.0 [\#175](https://github.com/apache/arrow-datafusion-python/pull/175) ([andygrove](https://github.com/andygrove)) +- Use latest DataFusion rev in preparation for DF 18 release [\#174](https://github.com/apache/arrow-datafusion-python/pull/174) ([andygrove](https://github.com/andygrove)) +- Arrow type bindings [\#173](https://github.com/apache/arrow-datafusion-python/pull/173) ([jdye64](https://github.com/jdye64)) +- Pyo3 bump [\#171](https://github.com/apache/arrow-datafusion-python/pull/171) ([jdye64](https://github.com/jdye64)) +- feature: Add additional aggregation functions [\#170](https://github.com/apache/arrow-datafusion-python/pull/170) ([simicd](https://github.com/simicd)) +- Make from\_substrait\_plan return DataFrame instead of LogicalPlan [\#164](https://github.com/apache/arrow-datafusion-python/pull/164) ([andygrove](https://github.com/andygrove)) +- feature: Implement count method [\#163](https://github.com/apache/arrow-datafusion-python/pull/163) ([simicd](https://github.com/simicd)) +- CI Fixes [\#162](https://github.com/apache/arrow-datafusion-python/pull/162) ([jdye64](https://github.com/jdye64)) +- Upgrade to DataFusion 17 [\#160](https://github.com/apache/arrow-datafusion-python/pull/160) ([andygrove](https://github.com/andygrove)) +- feature: Improve string representation of datafusion classes [\#159](https://github.com/apache/arrow-datafusion-python/pull/159) ([simicd](https://github.com/simicd)) +- Make PyExecutionPlan.plan public [\#156](https://github.com/apache/arrow-datafusion-python/pull/156) ([andygrove](https://github.com/andygrove)) +- Expose methods on logical and execution plans [\#155](https://github.com/apache/arrow-datafusion-python/pull/155) ([andygrove](https://github.com/andygrove)) +- Fix clippy for new Rust version [\#154](https://github.com/apache/arrow-datafusion-python/pull/154) ([andygrove](https://github.com/andygrove)) +- Add DataFrame methods for accessing plans [\#153](https://github.com/apache/arrow-datafusion-python/pull/153) ([andygrove](https://github.com/andygrove)) +- Use DataFusion rev 5238e8c97f998b4d2cb9fab85fb182f325a1a7fb [\#150](https://github.com/apache/arrow-datafusion-python/pull/150) ([andygrove](https://github.com/andygrove)) +- build\(deps\): bump async-trait from 0.1.61 to 0.1.62 [\#148](https://github.com/apache/arrow-datafusion-python/pull/148) ([dependabot[bot]](https://github.com/apps/dependabot)) +- Rename default branch from master to main [\#147](https://github.com/apache/arrow-datafusion-python/pull/147) ([andygrove](https://github.com/andygrove)) +- Substrait bindings [\#145](https://github.com/apache/arrow-datafusion-python/pull/145) ([jdye64](https://github.com/jdye64)) +- build\(deps\): bump uuid from 0.8.2 to 1.2.2 [\#143](https://github.com/apache/arrow-datafusion-python/pull/143) ([dependabot[bot]](https://github.com/apps/dependabot)) +- Prepare for 0.8.0 release [\#141](https://github.com/apache/arrow-datafusion-python/pull/141) ([andygrove](https://github.com/andygrove)) - Improve README and add more examples [\#137](https://github.com/apache/arrow-datafusion-python/pull/137) ([andygrove](https://github.com/andygrove)) +- test: Expand tests for built-in functions [\#129](https://github.com/apache/arrow-datafusion-python/pull/129) ([simicd](https://github.com/simicd)) - build\(deps\): bump object\_store from 0.5.2 to 0.5.3 [\#126](https://github.com/apache/arrow-datafusion-python/pull/126) ([dependabot[bot]](https://github.com/apps/dependabot)) - build\(deps\): bump mimalloc from 0.1.32 to 0.1.34 [\#125](https://github.com/apache/arrow-datafusion-python/pull/125) ([dependabot[bot]](https://github.com/apps/dependabot)) - Introduce conda directory containing datafusion-dev.yaml conda enviro… [\#124](https://github.com/apache/arrow-datafusion-python/pull/124) ([jdye64](https://github.com/jdye64)) From 3b74ddecddccadb852a073ebae64bddf43e0011e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 19 Feb 2023 11:49:24 -0700 Subject: [PATCH 02/13] Add Python wrapper for LogicalPlan::Sort (#196) --- datafusion/tests/test_imports.py | 3 +- src/expr.rs | 2 + src/expr/sort.rs | 94 ++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 src/expr/sort.rs diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index e5d958537..b7f4ef4ad 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -35,6 +35,7 @@ Expr, Projection, TableScan, + Sort, ) @@ -55,7 +56,7 @@ def test_class_module_is_datafusion(): ]: assert klass.__module__ == "datafusion" - for klass in [Expr, Projection, TableScan]: + for klass in [Expr, Projection, TableScan, Sort]: assert klass.__module__ == "datafusion.expr" for klass in [DFField, DFSchema]: diff --git a/src/expr.rs b/src/expr.rs index f3695febf..68534bcba 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -26,6 +26,7 @@ use datafusion::scalar::ScalarValue; pub mod logical_node; pub mod projection; +pub mod sort; pub mod table_scan; /// A PyExpr that can be used on a DataFrame @@ -143,5 +144,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/expr/sort.rs b/src/expr/sort.rs new file mode 100644 index 000000000..1d0a7f6d3 --- /dev/null +++ b/src/expr/sort.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::DataFusionError; +use datafusion_expr::logical_plan::Sort; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::expr::PyExpr; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Sort", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PySort { + sort: Sort, +} + +impl From for PySort { + fn from(sort: Sort) -> PySort { + PySort { sort } + } +} + +impl TryFrom for Sort { + type Error = DataFusionError; + + fn try_from(agg: PySort) -> Result { + Ok(agg.sort) + } +} + +impl Display for PySort { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Sort + \nExpr(s): {:?} + \nInput: {:?} + \nSchema: {:?}", + &self.sort.expr, + self.sort.input, + self.sort.input.schema() + ) + } +} + +#[pymethods] +impl PySort { + /// Retrieves the sort expressions for this `Sort` + fn sort_exprs(&self) -> PyResult> { + Ok(self + .sort + .expr + .iter() + .map(|e| PyExpr::from(e.clone())) + .collect()) + } + + /// Retrieves the input `LogicalPlan` to this `Sort` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.sort.input).clone()) + } + + /// Resulting Schema for this `Sort` node instance + fn schema(&self) -> PyDFSchema { + self.sort.input.schema().as_ref().clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Sort({})", self)) + } +} + +impl LogicalNode for PySort { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.sort.input).clone())] + } +} From f934dc16fbbf0e3b5eb58632f3c81a9db6cf31a5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 19 Feb 2023 12:12:56 -0700 Subject: [PATCH 03/13] Add Python wrapper for LogicalPlan::Aggregate (#195) --- datafusion/tests/test_imports.py | 3 +- src/expr.rs | 2 + src/expr/aggregate.rs | 106 +++++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 src/expr/aggregate.rs diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index b7f4ef4ad..5a6c68506 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -35,6 +35,7 @@ Expr, Projection, TableScan, + Aggregate, Sort, ) @@ -56,7 +57,7 @@ def test_class_module_is_datafusion(): ]: assert klass.__module__ == "datafusion" - for klass in [Expr, Projection, TableScan, Sort]: + for klass in [Expr, Projection, TableScan, Aggregate, Sort]: assert klass.__module__ == "datafusion.expr" for klass in [DFField, DFSchema]: diff --git a/src/expr.rs b/src/expr.rs index 68534bcba..15359d400 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -24,6 +24,7 @@ use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField}; use datafusion::scalar::ScalarValue; +pub mod aggregate; pub mod logical_node; pub mod projection; pub mod sort; @@ -144,6 +145,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; Ok(()) } diff --git a/src/expr/aggregate.rs b/src/expr/aggregate.rs new file mode 100644 index 000000000..98d1f554b --- /dev/null +++ b/src/expr/aggregate.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::DataFusionError; +use datafusion_expr::logical_plan::Aggregate; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::expr::PyExpr; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Aggregate", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyAggregate { + aggregate: Aggregate, +} + +impl From for PyAggregate { + fn from(aggregate: Aggregate) -> PyAggregate { + PyAggregate { aggregate } + } +} + +impl TryFrom for Aggregate { + type Error = DataFusionError; + + fn try_from(agg: PyAggregate) -> Result { + Ok(agg.aggregate) + } +} + +impl Display for PyAggregate { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Aggregate + \nGroupBy(s): {:?} + \nAggregates(s): {:?} + \nInput: {:?} + \nProjected Schema: {:?}", + &self.aggregate.group_expr, + &self.aggregate.aggr_expr, + self.aggregate.input, + self.aggregate.schema + ) + } +} + +#[pymethods] +impl PyAggregate { + /// Retrieves the grouping expressions for this `Aggregate` + fn group_by_exprs(&self) -> PyResult> { + Ok(self + .aggregate + .group_expr + .iter() + .map(|e| PyExpr::from(e.clone())) + .collect()) + } + + /// Retrieves the aggregate expressions for this `Aggregate` + fn aggregate_exprs(&self) -> PyResult> { + Ok(self + .aggregate + .aggr_expr + .iter() + .map(|e| PyExpr::from(e.clone())) + .collect()) + } + + // Retrieves the input `LogicalPlan` to this `Aggregate` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.aggregate.input).clone()) + } + + // Resulting Schema for this `Aggregate` node instance + fn schema(&self) -> PyDFSchema { + (*self.aggregate.schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Aggregate({})", self)) + } +} + +impl LogicalNode for PyAggregate { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.aggregate.input).clone())] + } +} From b8ef9bf54f148fe256ae2f67382f6625e6ceff20 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 19 Feb 2023 12:50:54 -0700 Subject: [PATCH 04/13] Add Python wrapper for LogicalPlan::Limit (#193) --- datafusion/tests/test_imports.py | 3 +- src/expr.rs | 2 + src/expr/limit.rs | 88 ++++++++++++++++++++++++++++++++ src/expr/projection.rs | 39 ++++---------- 4 files changed, 101 insertions(+), 31 deletions(-) create mode 100644 src/expr/limit.rs diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index 5a6c68506..ee47b0e6b 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -35,6 +35,7 @@ Expr, Projection, TableScan, + Limit, Aggregate, Sort, ) @@ -57,7 +58,7 @@ def test_class_module_is_datafusion(): ]: assert klass.__module__ == "datafusion" - for klass in [Expr, Projection, TableScan, Aggregate, Sort]: + for klass in [Expr, Projection, TableScan, Aggregate, Sort, Limit]: assert klass.__module__ == "datafusion.expr" for klass in [DFField, DFSchema]: diff --git a/src/expr.rs b/src/expr.rs index 15359d400..7ef9407cb 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -25,6 +25,7 @@ use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField}; use datafusion::scalar::ScalarValue; pub mod aggregate; +pub mod limit; pub mod logical_node; pub mod projection; pub mod sort; @@ -145,6 +146,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; Ok(()) diff --git a/src/expr/limit.rs b/src/expr/limit.rs new file mode 100644 index 000000000..a50e5b8aa --- /dev/null +++ b/src/expr/limit.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::logical_plan::Limit; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Limit", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyLimit { + limit: Limit, +} + +impl From for PyLimit { + fn from(limit: Limit) -> PyLimit { + PyLimit { limit } + } +} + +impl From for Limit { + fn from(limit: PyLimit) -> Self { + limit.limit + } +} + +impl Display for PyLimit { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Limit + \nSkip: {} + \nFetch: {:?} + \nInput: {:?}", + &self.limit.skip, &self.limit.fetch, &self.limit.input + ) + } +} + +#[pymethods] +impl PyLimit { + /// Retrieves the skip value for this `Limit` + fn skip(&self) -> usize { + self.limit.skip + } + + /// Retrieves the fetch value for this `Limit` + fn fetch(&self) -> Option { + self.limit.fetch + } + + /// Retrieves the input `LogicalPlan` to this `Limit` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.limit.input).clone()) + } + + /// Resulting Schema for this `Limit` node instance + fn schema(&self) -> PyResult { + Ok(self.limit.input.schema().as_ref().clone().into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Limit({})", self)) + } +} + +impl LogicalNode for PyLimit { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.limit.input).clone())] + } +} diff --git a/src/expr/projection.rs b/src/expr/projection.rs index 6d04e59a8..2d4363283 100644 --- a/src/expr/projection.rs +++ b/src/expr/projection.rs @@ -15,13 +15,11 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::DataFusionError; use datafusion_expr::logical_plan::Projection; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; use crate::common::df_schema::PyDFSchema; -use crate::errors::py_runtime_err; use crate::expr::logical_node::LogicalNode; use crate::expr::PyExpr; use crate::sql::logical::PyLogicalPlan; @@ -38,15 +36,9 @@ impl From for PyProjection { } } -impl TryFrom for Projection { - type Error = DataFusionError; - - fn try_from(py_proj: PyProjection) -> Result { - Projection::try_new_with_schema( - py_proj.projection.expr, - py_proj.projection.input.clone(), - py_proj.projection.schema, - ) +impl From for Projection { + fn from(proj: PyProjection) -> Self { + proj.projection } } @@ -66,8 +58,7 @@ impl Display for PyProjection { #[pymethods] impl PyProjection { /// Retrieves the expressions for this `Projection` - #[pyo3(name = "projections")] - fn py_projections(&self) -> PyResult> { + fn projections(&self) -> PyResult> { Ok(self .projection .expr @@ -76,25 +67,13 @@ impl PyProjection { .collect()) } - // Retrieves the input `LogicalPlan` to this `Projection` node - #[pyo3(name = "input")] - fn py_input(&self) -> PyResult { - // DataFusion make a loose guarantee that each Projection should have an input, however - // we check for that hear since we are performing explicit index retrieval - let inputs = LogicalNode::input(self); - if !inputs.is_empty() { - return Ok(inputs[0].clone()); - } - - Err(py_runtime_err(format!( - "Expected `input` field for Projection node: {}", - self - ))) + /// Retrieves the input `LogicalPlan` to this `Projection` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.projection.input).clone()) } - // Resulting Schema for this `Projection` node instance - #[pyo3(name = "schema")] - fn py_schema(&self) -> PyResult { + /// Resulting Schema for this `Projection` node instance + fn schema(&self) -> PyResult { Ok((*self.projection.schema).clone().into()) } From 312427832627f981b71e9de69c7e7e99fa704306 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 19 Feb 2023 16:37:58 -0700 Subject: [PATCH 05/13] Add Python wrapper for LogicalPlan::Filter (#192) * Add Python wrapper for LogicalPlan::Filter * clippy * clippy * Update src/expr/filter.rs Co-authored-by: Liang-Chi Hsieh --------- Co-authored-by: Liang-Chi Hsieh --- datafusion/tests/test_imports.py | 3 +- src/expr.rs | 2 + src/expr/filter.rs | 83 ++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 src/expr/filter.rs diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index ee47b0e6b..dfc1f6535 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -35,6 +35,7 @@ Expr, Projection, TableScan, + Filter, Limit, Aggregate, Sort, @@ -58,7 +59,7 @@ def test_class_module_is_datafusion(): ]: assert klass.__module__ == "datafusion" - for klass in [Expr, Projection, TableScan, Aggregate, Sort, Limit]: + for klass in [Expr, Projection, TableScan, Aggregate, Sort, Limit, Filter]: assert klass.__module__ == "datafusion.expr" for klass in [DFField, DFSchema]: diff --git a/src/expr.rs b/src/expr.rs index 7ef9407cb..adb9e55a0 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -25,6 +25,7 @@ use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField}; use datafusion::scalar::ScalarValue; pub mod aggregate; +pub mod filter; pub mod limit; pub mod logical_node; pub mod projection; @@ -146,6 +147,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/expr/filter.rs b/src/expr/filter.rs new file mode 100644 index 000000000..b7b48b9d2 --- /dev/null +++ b/src/expr/filter.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::logical_plan::Filter; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::expr::PyExpr; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Filter", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyFilter { + filter: Filter, +} + +impl From for PyFilter { + fn from(filter: Filter) -> PyFilter { + PyFilter { filter } + } +} + +impl From for Filter { + fn from(filter: PyFilter) -> Self { + filter.filter + } +} + +impl Display for PyFilter { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Filter + \nPredicate: {:?} + \nInput: {:?}", + &self.filter.predicate, &self.filter.input + ) + } +} + +#[pymethods] +impl PyFilter { + /// Retrieves the predicate expression for this `Filter` + fn predicate(&self) -> PyExpr { + PyExpr::from(self.filter.predicate.clone()) + } + + /// Retrieves the input `LogicalPlan` to this `Filter` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.filter.input).clone()) + } + + /// Resulting Schema for this `Filter` node instance + fn schema(&self) -> PyDFSchema { + self.filter.input.schema().as_ref().clone().into() + } + + fn __repr__(&self) -> String { + format!("Filter({})", self) + } +} + +impl LogicalNode for PyFilter { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.filter.input).clone())] + } +} From d62cbdfa4d963e01627b33e2b6a5d41b836fb940 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 20 Feb 2023 08:39:06 -0700 Subject: [PATCH 06/13] Add tests for recently added functionality (#199) --- datafusion/tests/test_expr.py | 83 +++++++++++++++++++++++++++++++++++ src/sql/logical.rs | 23 ++++++++++ 2 files changed, 106 insertions(+) create mode 100644 datafusion/tests/test_expr.py diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py new file mode 100644 index 000000000..4a7db879a --- /dev/null +++ b/datafusion/tests/test_expr.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion import SessionContext +from datafusion.expr import ( + Projection, + Filter, + Aggregate, + Limit, + Sort, + TableScan, +) +import pytest + + +@pytest.fixture +def test_ctx(): + ctx = SessionContext() + ctx.register_csv("test", "testing/data/csv/aggregate_test_100.csv") + return ctx + + +def test_projection(test_ctx): + df = test_ctx.sql("select c1, 123, c1 < 123 from test") + plan = df.logical_plan() + + plan = plan.to_variant() + assert isinstance(plan, Projection) + + plan = plan.input().to_variant() + assert isinstance(plan, TableScan) + + +def test_filter(test_ctx): + df = test_ctx.sql("select c1 from test WHERE c1 > 5") + plan = df.logical_plan() + + plan = plan.to_variant() + assert isinstance(plan, Projection) + + plan = plan.input().to_variant() + assert isinstance(plan, Filter) + + +def test_limit(test_ctx): + df = test_ctx.sql("select c1 from test LIMIT 10") + plan = df.logical_plan() + + plan = plan.to_variant() + assert isinstance(plan, Limit) + + +def test_aggregate(test_ctx): + df = test_ctx.sql("select c1, COUNT(*) from test GROUP BY c1") + plan = df.logical_plan() + + plan = plan.to_variant() + assert isinstance(plan, Projection) + + plan = plan.input().to_variant() + assert isinstance(plan, Aggregate) + + +def test_sort(test_ctx): + df = test_ctx.sql("select c1 from test order by c1") + plan = df.logical_plan() + + plan = plan.to_variant() + assert isinstance(plan, Sort) diff --git a/src/sql/logical.rs b/src/sql/logical.rs index dcd7baa58..08d19619d 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -17,6 +17,13 @@ use std::sync::Arc; +use crate::errors::py_runtime_err; +use crate::expr::aggregate::PyAggregate; +use crate::expr::filter::PyFilter; +use crate::expr::limit::PyLimit; +use crate::expr::projection::PyProjection; +use crate::expr::sort::PySort; +use crate::expr::table_scan::PyTableScan; use datafusion_expr::LogicalPlan; use pyo3::prelude::*; @@ -37,6 +44,22 @@ impl PyLogicalPlan { #[pymethods] impl PyLogicalPlan { + /// Return the specific logical operator + fn to_variant(&self, py: Python) -> PyResult { + Python::with_gil(|_| match self.plan.as_ref() { + LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), + LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), + LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)), + LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)), + LogicalPlan::Sort(plan) => Ok(PySort::from(plan.clone()).into_py(py)), + LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)), + other => Err(py_runtime_err(format!( + "Cannot convert this plan to a LogicalNode: {:?}", + other + ))), + }) + } + /// Get the inputs to this plan pub fn inputs(&self) -> Vec { let mut inputs = vec![]; From ca8b0551cd9dc7f6832f94a8b44f31cb5b3aa218 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 20 Feb 2023 10:03:21 -0700 Subject: [PATCH 07/13] Add experimental support for executing SQL with Polars and Pandas (#190) --- README.md | 3 ++ datafusion/pandas.py | 62 +++++++++++++++++++++++ datafusion/polars.py | 85 ++++++++++++++++++++++++++++++++ datafusion/tests/test_expr.py | 39 ++++++++++++--- datafusion/tests/test_imports.py | 12 ++++- examples/README.md | 12 +++++ examples/sql-on-pandas.py | 26 ++++++++++ examples/sql-on-polars.py | 28 +++++++++++ src/expr.rs | 32 ++++++++++++ src/expr/aggregate_expr.rs | 73 +++++++++++++++++++++++++++ src/expr/binary_expr.rs | 57 +++++++++++++++++++++ src/expr/column.rs | 60 ++++++++++++++++++++++ src/expr/literal.rs | 74 +++++++++++++++++++++++++++ src/expr/projection.rs | 6 +++ src/expr/table_scan.rs | 15 ++++++ src/sql/logical.rs | 8 ++- 16 files changed, 583 insertions(+), 9 deletions(-) create mode 100644 datafusion/pandas.py create mode 100644 datafusion/polars.py create mode 100644 examples/sql-on-pandas.py create mode 100644 examples/sql-on-polars.py create mode 100644 src/expr/aggregate_expr.rs create mode 100644 src/expr/binary_expr.rs create mode 100644 src/expr/column.rs create mode 100644 src/expr/literal.rs diff --git a/README.md b/README.md index ab89ff6dd..d465ebcd0 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,9 @@ from having to lock the GIL when running those operations. Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions about thread safety and lack of memory leaks. +There is also experimental support for executing SQL against other DataFrame libraries, such as Polars, Pandas, and any +drop-in replacements for Pandas. + Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html). ## Example Usage diff --git a/datafusion/pandas.py b/datafusion/pandas.py new file mode 100644 index 000000000..36e4ba2e0 --- /dev/null +++ b/datafusion/pandas.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pandas as pd +import datafusion +from datafusion.expr import Projection, TableScan, Column + + +class SessionContext: + def __init__(self): + self.datafusion_ctx = datafusion.SessionContext() + self.parquet_tables = {} + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def to_pandas_expr(self, expr): + + # get Python wrapper for logical expression + expr = expr.to_variant() + + if isinstance(expr, Column): + return expr.name() + else: + raise Exception("unsupported expression: {}".format(expr)) + + def to_pandas_df(self, plan): + # recurse down first to translate inputs into pandas data frames + inputs = [self.to_pandas_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_variant() + + if isinstance(node, Projection): + args = [self.to_pandas_expr(expr) for expr in node.projections()] + return inputs[0][args] + elif isinstance(node, TableScan): + return pd.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_pandas_df(plan) diff --git a/datafusion/polars.py b/datafusion/polars.py new file mode 100644 index 000000000..e29e51156 --- /dev/null +++ b/datafusion/polars.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import polars +import datafusion +from datafusion.expr import Projection, TableScan, Aggregate +from datafusion.expr import Column, AggregateFunction + + +class SessionContext: + def __init__(self): + self.datafusion_ctx = datafusion.SessionContext() + self.parquet_tables = {} + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def to_polars_expr(self, expr): + + # get Python wrapper for logical expression + expr = expr.to_variant() + + if isinstance(expr, Column): + return polars.col(expr.name()) + else: + raise Exception("unsupported expression: {}".format(expr)) + + def to_polars_df(self, plan): + # recurse down first to translate inputs into Polars data frames + inputs = [self.to_polars_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_variant() + + if isinstance(node, Projection): + args = [self.to_polars_expr(expr) for expr in node.projections()] + return inputs[0].select(*args) + elif isinstance(node, Aggregate): + groupby_expr = [ + self.to_polars_expr(expr) for expr in node.group_by_exprs() + ] + aggs = [] + for expr in node.aggregate_exprs(): + expr = expr.to_variant() + if isinstance(expr, AggregateFunction): + if expr.aggregate_type() == "COUNT": + aggs.append(polars.count().alias("{}".format(expr))) + else: + raise Exception( + "Unsupported aggregate function {}".format( + expr.aggregate_type() + ) + ) + else: + raise Exception( + "Unsupported aggregate function {}".format(expr) + ) + df = inputs[0].groupby(groupby_expr).agg(aggs) + return df + elif isinstance(node, TableScan): + return polars.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_polars_df(plan) diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py index 4a7db879a..143eea6ff 100644 --- a/datafusion/tests/test_expr.py +++ b/datafusion/tests/test_expr.py @@ -16,6 +16,7 @@ # under the License. from datafusion import SessionContext +from datafusion.expr import Column, Literal, BinaryExpr, AggregateFunction from datafusion.expr import ( Projection, Filter, @@ -41,6 +42,24 @@ def test_projection(test_ctx): plan = plan.to_variant() assert isinstance(plan, Projection) + expr = plan.projections() + + col1 = expr[0].to_variant() + assert isinstance(col1, Column) + assert col1.name() == "c1" + assert col1.qualified_name() == "test.c1" + + col2 = expr[1].to_variant() + assert isinstance(col2, Literal) + assert col2.data_type() == "Int64" + assert col2.value_i64() == 123 + + col3 = expr[2].to_variant() + assert isinstance(col3, BinaryExpr) + assert isinstance(col3.left().to_variant(), Column) + assert col3.op() == "<" + assert isinstance(col3.right().to_variant(), Literal) + plan = plan.input().to_variant() assert isinstance(plan, TableScan) @@ -64,15 +83,23 @@ def test_limit(test_ctx): assert isinstance(plan, Limit) -def test_aggregate(test_ctx): - df = test_ctx.sql("select c1, COUNT(*) from test GROUP BY c1") +def test_aggregate_query(test_ctx): + df = test_ctx.sql("select c1, count(*) from test group by c1") plan = df.logical_plan() - plan = plan.to_variant() - assert isinstance(plan, Projection) + projection = plan.to_variant() + assert isinstance(projection, Projection) - plan = plan.input().to_variant() - assert isinstance(plan, Aggregate) + aggregate = projection.input().to_variant() + assert isinstance(aggregate, Aggregate) + + col1 = aggregate.group_by_exprs()[0].to_variant() + assert isinstance(col1, Column) + assert col1.name() == "c1" + assert col1.qualified_name() == "test.c1" + + col2 = aggregate.aggregate_exprs()[0].to_variant() + assert isinstance(col2, AggregateFunction) def test_sort(test_ctx): diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index dfc1f6535..40b005b0d 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -33,6 +33,10 @@ from datafusion.expr import ( Expr, + Column, + Literal, + BinaryExpr, + AggregateFunction, Projection, TableScan, Filter, @@ -59,9 +63,15 @@ def test_class_module_is_datafusion(): ]: assert klass.__module__ == "datafusion" - for klass in [Expr, Projection, TableScan, Aggregate, Sort, Limit, Filter]: + # expressions + for klass in [Expr, Column, Literal, BinaryExpr, AggregateFunction]: assert klass.__module__ == "datafusion.expr" + # operators + for klass in [Projection, TableScan, Aggregate, Sort, Limit, Filter]: + assert klass.__module__ == "datafusion.expr" + + # schema for klass in [DFField, DFSchema]: assert klass.__module__ == "datafusion.common" diff --git a/examples/README.md b/examples/README.md index a3ae0ba42..e73636642 100644 --- a/examples/README.md +++ b/examples/README.md @@ -19,9 +19,21 @@ # DataFusion Python Examples +Some of the examples rely on data which can be downloaded from the following site: + +- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page + +Here is a direct link to the file used in the examples: + +- https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet + +## Examples + - [Query a Parquet file using SQL](./sql-parquet.py) - [Query a Parquet file using the DataFrame API](./dataframe-parquet.py) - [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py) - [Query PyArrow Data](./query-pyarrow-data.py) - [Register a Python UDF with DataFusion](./python-udf.py) - [Register a Python UDAF with DataFusion](./python-udaf.py) +- [Executing SQL on Polars](./sql-on-polars.py) +- [Executing SQL on Pandas](./sql-on-pandas.py) diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py new file mode 100644 index 000000000..8a2d59333 --- /dev/null +++ b/examples/sql-on-pandas.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion.pandas import SessionContext + + +ctx = SessionContext() +ctx.register_parquet( + "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" +) +df = ctx.sql("select passenger_count from taxi") +print(df) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py new file mode 100644 index 000000000..0173b68b4 --- /dev/null +++ b/examples/sql-on-polars.py @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion.polars import SessionContext + + +ctx = SessionContext() +ctx.register_parquet( + "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" +) +df = ctx.sql( + "select passenger_count, count(*) from taxi group by passenger_count" +) +print(df) diff --git a/src/expr.rs b/src/expr.rs index adb9e55a0..ba01c9927 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -22,11 +22,20 @@ use datafusion::arrow::datatypes::DataType; use datafusion::arrow::pyarrow::PyArrowType; use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField}; +use crate::errors::py_runtime_err; +use crate::expr::aggregate_expr::PyAggregateFunction; +use crate::expr::binary_expr::PyBinaryExpr; +use crate::expr::column::PyColumn; +use crate::expr::literal::PyLiteral; use datafusion::scalar::ScalarValue; pub mod aggregate; +pub mod aggregate_expr; +pub mod binary_expr; +pub mod column; pub mod filter; pub mod limit; +pub mod literal; pub mod logical_node; pub mod projection; pub mod sort; @@ -53,6 +62,22 @@ impl From for PyExpr { #[pymethods] impl PyExpr { + /// Return the specific expression + fn to_variant(&self, py: Python) -> PyResult { + Python::with_gil(|_| match &self.expr { + Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)), + Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_py(py)), + Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_py(py)), + Expr::AggregateFunction(expr) => { + Ok(PyAggregateFunction::from(expr.clone()).into_py(py)) + } + other => Err(py_runtime_err(format!( + "Cannot convert this Expr to a Python object: {:?}", + other + ))), + }) + } + fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr { let expr = match op { CompareOp::Lt => self.expr.clone().lt(other.expr), @@ -144,7 +169,14 @@ impl PyExpr { /// Initializes the `expr` module to match the pattern of `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { + // expressions m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + // operators m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/expr/aggregate_expr.rs b/src/expr/aggregate_expr.rs new file mode 100644 index 000000000..180105180 --- /dev/null +++ b/src/expr/aggregate_expr.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::PyExpr; +use datafusion_expr::expr::AggregateFunction; +use pyo3::prelude::*; +use std::fmt::{Display, Formatter}; + +#[pyclass(name = "AggregateFunction", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyAggregateFunction { + aggr: AggregateFunction, +} + +impl From for AggregateFunction { + fn from(aggr: PyAggregateFunction) -> Self { + aggr.aggr + } +} + +impl From for PyAggregateFunction { + fn from(aggr: AggregateFunction) -> PyAggregateFunction { + PyAggregateFunction { aggr } + } +} + +impl Display for PyAggregateFunction { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + let args: Vec = self.aggr.args.iter().map(|expr| expr.to_string()).collect(); + write!(f, "{}({})", self.aggr.fun, args.join(", ")) + } +} + +#[pymethods] +impl PyAggregateFunction { + /// Get the aggregate type, such as "MIN", or "MAX" + fn aggregate_type(&self) -> String { + format!("{}", self.aggr.fun) + } + + /// is this a distinct aggregate such as `COUNT(DISTINCT expr)` + fn is_distinct(&self) -> bool { + self.aggr.distinct + } + + /// Get the arguments to the aggregate function + fn args(&self) -> Vec { + self.aggr + .args + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect() + } + + /// Get a String representation of this column + fn __repr__(&self) -> String { + format!("{}", self) + } +} diff --git a/src/expr/binary_expr.rs b/src/expr/binary_expr.rs new file mode 100644 index 000000000..5f382b770 --- /dev/null +++ b/src/expr/binary_expr.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::PyExpr; +use datafusion_expr::BinaryExpr; +use pyo3::prelude::*; + +#[pyclass(name = "BinaryExpr", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyBinaryExpr { + expr: BinaryExpr, +} + +impl From for BinaryExpr { + fn from(expr: PyBinaryExpr) -> Self { + expr.expr + } +} + +impl From for PyBinaryExpr { + fn from(expr: BinaryExpr) -> PyBinaryExpr { + PyBinaryExpr { expr } + } +} + +#[pymethods] +impl PyBinaryExpr { + fn left(&self) -> PyExpr { + self.expr.left.as_ref().clone().into() + } + + fn right(&self) -> PyExpr { + self.expr.right.as_ref().clone().into() + } + + fn op(&self) -> String { + format!("{}", self.expr.op) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.expr)) + } +} diff --git a/src/expr/column.rs b/src/expr/column.rs new file mode 100644 index 000000000..16b8bce3c --- /dev/null +++ b/src/expr/column.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::Column; +use pyo3::prelude::*; + +#[pyclass(name = "Column", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyColumn { + pub col: Column, +} + +impl PyColumn { + pub fn new(col: Column) -> Self { + Self { col } + } +} + +impl From for PyColumn { + fn from(col: Column) -> PyColumn { + PyColumn { col } + } +} + +#[pymethods] +impl PyColumn { + /// Get the column name + fn name(&self) -> String { + self.col.name.clone() + } + + /// Get the column relation + fn relation(&self) -> Option { + self.col.relation.clone() + } + + /// Get the fully-qualified column name + fn qualified_name(&self) -> String { + self.col.flat_name() + } + + /// Get a String representation of this column + fn __repr__(&self) -> String { + self.qualified_name() + } +} diff --git a/src/expr/literal.rs b/src/expr/literal.rs new file mode 100644 index 000000000..27674ce6f --- /dev/null +++ b/src/expr/literal.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::py_runtime_err; +use datafusion_common::ScalarValue; +use pyo3::prelude::*; + +#[pyclass(name = "Literal", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyLiteral { + pub value: ScalarValue, +} + +impl From for ScalarValue { + fn from(lit: PyLiteral) -> ScalarValue { + lit.value + } +} + +impl From for PyLiteral { + fn from(value: ScalarValue) -> PyLiteral { + PyLiteral { value } + } +} + +#[pymethods] +impl PyLiteral { + /// Get the data type of this literal value + fn data_type(&self) -> String { + format!("{}", self.value.get_datatype()) + } + + fn value_i32(&self) -> PyResult { + if let ScalarValue::Int32(Some(n)) = &self.value { + Ok(*n) + } else { + Err(py_runtime_err("Cannot access value as i32")) + } + } + + fn value_i64(&self) -> PyResult { + if let ScalarValue::Int64(Some(n)) = &self.value { + Ok(*n) + } else { + Err(py_runtime_err("Cannot access value as i64")) + } + } + + fn value_str(&self) -> PyResult { + if let ScalarValue::Utf8(Some(str)) = &self.value { + Ok(str.clone()) + } else { + Err(py_runtime_err("Cannot access value as string")) + } + } + + fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.value)) + } +} diff --git a/src/expr/projection.rs b/src/expr/projection.rs index 2d4363283..4c158f763 100644 --- a/src/expr/projection.rs +++ b/src/expr/projection.rs @@ -30,6 +30,12 @@ pub struct PyProjection { projection: Projection, } +impl PyProjection { + pub fn new(projection: Projection) -> Self { + Self { projection } + } +} + impl From for PyProjection { fn from(projection: Projection) -> PyProjection { PyProjection { projection } diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs index 00504b97f..2784523e7 100644 --- a/src/expr/table_scan.rs +++ b/src/expr/table_scan.rs @@ -19,6 +19,8 @@ use datafusion_expr::logical_plan::TableScan; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; +use crate::expr::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; use crate::{common::df_schema::PyDFSchema, expr::PyExpr}; #[pyclass(name = "TableScan", module = "datafusion.expr", subclass)] @@ -27,6 +29,12 @@ pub struct PyTableScan { table_scan: TableScan, } +impl PyTableScan { + pub fn new(table_scan: TableScan) -> Self { + Self { table_scan } + } +} + impl From for TableScan { fn from(tbl_scan: PyTableScan) -> TableScan { tbl_scan.table_scan @@ -117,3 +125,10 @@ impl PyTableScan { Ok(format!("TableScan({})", self)) } } + +impl LogicalNode for PyTableScan { + fn input(&self) -> Vec { + // table scans are leaf nodes and do not have inputs + vec![] + } +} diff --git a/src/sql/logical.rs b/src/sql/logical.rs index 08d19619d..ce6b1fbfd 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -49,10 +49,10 @@ impl PyLogicalPlan { Python::with_gil(|_| match self.plan.as_ref() { LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), - LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)), + LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)), LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)), LogicalPlan::Sort(plan) => Ok(PySort::from(plan.clone()).into_py(py)), - LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)), + LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)), other => Err(py_runtime_err(format!( "Cannot convert this plan to a LogicalNode: {:?}", other @@ -69,6 +69,10 @@ impl PyLogicalPlan { inputs } + fn __repr__(&self) -> PyResult { + Ok(format!("{:?}", self.plan)) + } + pub fn display(&self) -> String { format!("{}", self.plan.display()) } From bb004eef998771a5ac0d650afcfd0a95e747199a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 21 Feb 2023 09:49:04 -0700 Subject: [PATCH 08/13] Run `maturin develop` instead of `cargo build` in verification script (#200) --- dev/release/verify-release-candidate.sh | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh index fee276c11..be86f69e0 100755 --- a/dev/release/verify-release-candidate.sh +++ b/dev/release/verify-release-candidate.sh @@ -125,15 +125,19 @@ test_source_distribution() { git clone https://github.com/apache/arrow-testing.git testing git clone https://github.com/apache/parquet-testing.git parquet-testing - cargo build - cargo test --all + python3 -m venv venv + source venv/bin/activate + python3 -m pip install -U pip + python3 -m pip install -r requirements-310.txt + maturin develop + + #TODO: we should really run tests here as well + #python3 -m pytest if ( find -iname 'Cargo.toml' | xargs grep SNAPSHOT ); then echo "Cargo.toml version should not contain SNAPSHOT for releases" exit 1 fi - - cargo publish --dry-run } TEST_SUCCESS=no From 774ea70eabdf2fefbbd48ab36c85bff346e6c0e1 Mon Sep 17 00:00:00 2001 From: Dejan Simic <10134699+simicd@users.noreply.github.com> Date: Wed, 22 Feb 2023 01:24:28 +0100 Subject: [PATCH 09/13] Implement `to_pandas()` (#197) * Implement to_pandas() * Update documentation * Write unit test --- README.md | 12 +++--------- datafusion/tests/test_dataframe.py | 11 +++++++++++ examples/sql-to-pandas.py | 10 ++-------- src/dataframe.rs | 18 ++++++++++++++++++ 4 files changed, 34 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index d465ebcd0..65f6ef3e0 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ from having to lock the GIL when running those operations. Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions about thread safety and lack of memory leaks. -There is also experimental support for executing SQL against other DataFrame libraries, such as Polars, Pandas, and any +There is also experimental support for executing SQL against other DataFrame libraries, such as Polars, Pandas, and any drop-in replacements for Pandas. Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html). @@ -70,17 +70,11 @@ df = ctx.sql("select passenger_count, count(*) " "group by passenger_count " "order by passenger_count") -# collect as list of pyarrow.RecordBatch -results = df.collect() - -# get first batch -batch = results[0] - # convert to Pandas -df = batch.to_pandas() +pandas_df = df.to_pandas() # create a chart -fig = df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure() +fig = pandas_df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure() fig.savefig('chart.png') ``` diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index 18946888f..292a4b00c 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -533,3 +533,14 @@ def test_cache(df): def test_count(df): # Get number of rows assert df.count() == 3 + + +def test_to_pandas(df): + # Skip test if pandas is not installed + pd = pytest.importorskip("pandas") + + # Convert datafusion dataframe to pandas dataframe + pandas_df = df.to_pandas() + assert type(pandas_df) == pd.DataFrame + assert pandas_df.shape == (3, 3) + assert set(pandas_df.columns) == {"a", "b", "c"} diff --git a/examples/sql-to-pandas.py b/examples/sql-to-pandas.py index 3569e6d8c..3e99b22de 100644 --- a/examples/sql-to-pandas.py +++ b/examples/sql-to-pandas.py @@ -33,17 +33,11 @@ "order by passenger_count" ) -# collect as list of pyarrow.RecordBatch -results = df.collect() - -# get first batch -batch = results[0] - # convert to Pandas -df = batch.to_pandas() +pandas_df = df.to_pandas() # create a chart -fig = df.plot( +fig = pandas_df.plot( kind="bar", title="Trip Count by Number of Passengers" ).get_figure() fig.savefig("chart.png") diff --git a/src/dataframe.rs b/src/dataframe.rs index 4b9fbca6c..a1c68dd1c 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -313,6 +313,24 @@ impl PyDataFrame { Ok(()) } + /// Convert to pandas dataframe with pyarrow + /// Collect the batches, pass to Arrow Table & then convert to Pandas DataFrame + fn to_pandas(&self, py: Python) -> PyResult { + let batches = self.collect(py); + + Python::with_gil(|py| { + // Instantiate pyarrow Table object and use its from_batches method + let table_class = py.import("pyarrow")?.getattr("Table")?; + let args = PyTuple::new(py, batches); + let table: PyObject = table_class.call_method1("from_batches", args)?.into(); + + // Use Table.to_pandas() method to convert batches to pandas dataframe + // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas + let result = table.call_method0(py, "to_pandas")?; + Ok(result) + }) + } + // Executes this DataFrame to get the total number of rows. fn count(&self, py: Python) -> PyResult { Ok(wait_for_future(py, self.df.as_ref().clone().count())?) From d3d42685255246f93933f3abe82301f3db18a52d Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 21 Feb 2023 20:50:07 -0500 Subject: [PATCH 10/13] Add support for cudf as a physical execution engine (#205) --- Cargo.lock | 20 ++++----- conda/environments/datafusion-dev.yaml | 5 ++- datafusion/cudf.py | 62 ++++++++++++++++++++++++++ examples/sql-on-cudf.py | 26 +++++++++++ 4 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 datafusion/cudf.py create mode 100644 examples/sql-on-cudf.py diff --git a/Cargo.lock b/Cargo.lock index 5059afaac..04a2ea8d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1134,9 +1134,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ "bytes", "fnv", @@ -1385,9 +1385,9 @@ checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" [[package]] name = "libflate" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093" +checksum = "97822bf791bd4d5b403713886a5fbe8bf49520fe78e323b0dc480ca1a03e50b0" dependencies = [ "adler32", "crc32fast", @@ -1396,9 +1396,9 @@ dependencies = [ [[package]] name = "libflate_lz77" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a" +checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf" dependencies = [ "rle-decode-fast", ] @@ -2359,9 +2359,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" dependencies = [ "autocfg", ] @@ -2635,9 +2635,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" dependencies = [ "futures-core", "pin-project-lite", diff --git a/conda/environments/datafusion-dev.yaml b/conda/environments/datafusion-dev.yaml index 0e17e1699..d9405e4fe 100644 --- a/conda/environments/datafusion-dev.yaml +++ b/conda/environments/datafusion-dev.yaml @@ -28,7 +28,7 @@ dependencies: - pytest - toml - importlib_metadata -- python>=3.7,<3.11 +- python>=3.10 # Packages useful for building distributions and releasing - mamba - conda-build @@ -38,4 +38,7 @@ dependencies: - pydata-sphinx-theme==0.8.0 - myst-parser - jinja2 +# GPU packages +- cudf +- cudatoolkit=11.8 name: datafusion-dev diff --git a/datafusion/cudf.py b/datafusion/cudf.py new file mode 100644 index 000000000..c38819c62 --- /dev/null +++ b/datafusion/cudf.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import cudf +import datafusion +from datafusion.expr import Projection, TableScan, Column + + +class SessionContext: + def __init__(self): + self.datafusion_ctx = datafusion.SessionContext() + self.parquet_tables = {} + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def to_cudf_expr(self, expr): + + # get Python wrapper for logical expression + expr = expr.to_variant() + + if isinstance(expr, Column): + return expr.name() + else: + raise Exception("unsupported expression: {}".format(expr)) + + def to_cudf_df(self, plan): + # recurse down first to translate inputs into pandas data frames + inputs = [self.to_cudf_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_variant() + + if isinstance(node, Projection): + args = [self.to_cudf_expr(expr) for expr in node.projections()] + return inputs[0][args] + elif isinstance(node, TableScan): + return cudf.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_cudf_df(plan) diff --git a/examples/sql-on-cudf.py b/examples/sql-on-cudf.py new file mode 100644 index 000000000..407cb1f00 --- /dev/null +++ b/examples/sql-on-cudf.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion.cudf import SessionContext + + +ctx = SessionContext() +ctx.register_parquet( + "taxi", "/home/jeremy/Downloads/yellow_tripdata_2021-01.parquet" +) +df = ctx.sql("select passenger_count from taxi") +print(df) From 162fdd9998bf0f9fbb6b31289c14eed1c1844a1e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 21 Feb 2023 19:06:58 -0700 Subject: [PATCH 11/13] Update README in preparation for 0.8 release (#206) --- README.md | 76 ++++++++++++++++------------------- examples/README.md | 30 +++++++++----- examples/dataframe-parquet.py | 6 +-- examples/sql-on-pandas.py | 4 +- examples/sql-on-polars.py | 4 +- examples/sql-parquet.py | 4 +- examples/substrait.py | 53 ++++++++++++++++++++++++ 7 files changed, 113 insertions(+), 64 deletions(-) create mode 100644 examples/substrait.py diff --git a/README.md b/README.md index 65f6ef3e0..e78f61370 100644 --- a/README.md +++ b/README.md @@ -24,22 +24,30 @@ This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion). -Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV -files, run it in a multi-threaded environment, and obtain the result back in Python. +DataFusion's Python bindings can be used as an end-user tool as well as providing a foundation for building new systems. -It also allows you to use UDFs and UDAFs for complex operations. +## Features -The major advantage of this library over other execution engines is that this library achieves zero-copy between -Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart -from having to lock the GIL when running those operations. +- Execute queries using SQL or DataFrames against CSV, Parquet, and JSON data sources +- Queries are optimized using DataFusion's query optimizer +- Execute user-defined Python code from SQL +- Exchange data with Pandas and other DataFrame libraries that support PyArrow +- Serialize and deserialize query plans in Substrait format +- Experimental support for executing SQL queries against Polars, Pandas and cuDF -Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions -about thread safety and lack of memory leaks. +## Comparison with other projects -There is also experimental support for executing SQL against other DataFrame libraries, such as Polars, Pandas, and any -drop-in replacements for Pandas. +Here is a comparison with similar projects that may help understand when DataFusion might be suitable and unsuitable +for your needs: -Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html). +- [DuckDB](http://www.duckdb.org/) is an open source, in-process analytic database. Like DataFusion, it supports + very fast execution, both from its custom file format and directly from Parquet files. Unlike DataFusion, it is + written in C/C++ and it is primarily used directly by users as a serverless database and query system rather than + as a library for building such database systems. + +- [Polars](http://pola.rs/) is one of the fastest DataFrame libraries at the time of writing. Like DataFusion, it + is also written in Rust and uses the Apache Arrow memory model, but unlike DataFusion it does not provide full SQL + support, nor as many extension points. ## Example Usage @@ -50,12 +58,8 @@ The Parquet file used in this example can be downloaded from the following page: - https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page -See the [examples](examples) directory for more examples. - ```python from datafusion import SessionContext -import pandas as pd -import pyarrow as pa # Create a DataFusion context ctx = SessionContext() @@ -82,42 +86,30 @@ This produces the following chart: ![Chart](examples/chart.png) -## Substrait Support - -`arrow-datafusion-python` has bindings which allow for serializing a SQL query to substrait protobuf format and deserializing substrait protobuf bytes to a DataFusion `LogicalPlan`, `PyLogicalPlan` in a Python context, which can then be executed. +## More Examples -### Example of Serializing/Deserializing Substrait Plans - -```python -from datafusion import SessionContext -from datafusion import substrait as ss +See [examples](examples/README.md) for more information. -# Create a DataFusion context -ctx = SessionContext() +### Executing Queries with DataFusion -# Register table with context -ctx.register_parquet('aggregate_test_data', './testing/data/csv/aggregate_test_100.csv') +- [Query a Parquet file using SQL](./examples/sql-parquet.py) +- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py) +- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py) +- [Query PyArrow Data](./examples/query-pyarrow-data.py) -substrait_plan = ss.substrait.serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx) -# type(substrait_plan) -> +### Running User-Defined Python Code -# Alternative serialization approaches -# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely -# where they could subsequently be deserialized on the receiving end. -substrait_bytes = ss.substrait.serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx) +- [Register a Python UDF with DataFusion](./examples/python-udf.py) +- [Register a Python UDAF with DataFusion](./examples/python-udaf.py) -# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused -# type(substrait_plan) -> -substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes) +### Substrait Support -# type(df_logical_plan) -> -df_logical_plan = ss.substrait.consumer.from_substrait_plan(ctx, substrait_plan) +- [Serialize query plans using Substrait](./examples/substrait.py) -# Back to Substrait Plan just for demonstration purposes -# type(substrait_plan) -> -substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan) +### Executing SQL against DataFrame Libraries (Experimental) -``` +- [Executing SQL on Polars](./examples/sql-on-polars.py) +- [Executing SQL on Pandas](./examples/sql-on-pandas.py) ## How to install (from pip) diff --git a/examples/README.md b/examples/README.md index e73636642..ce98600fe 100644 --- a/examples/README.md +++ b/examples/README.md @@ -19,7 +19,7 @@ # DataFusion Python Examples -Some of the examples rely on data which can be downloaded from the following site: +Some examples rely on data which can be downloaded from the following site: - https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page @@ -27,13 +27,23 @@ Here is a direct link to the file used in the examples: - https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet -## Examples +### Executing Queries with DataFusion -- [Query a Parquet file using SQL](./sql-parquet.py) -- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py) -- [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py) -- [Query PyArrow Data](./query-pyarrow-data.py) -- [Register a Python UDF with DataFusion](./python-udf.py) -- [Register a Python UDAF with DataFusion](./python-udaf.py) -- [Executing SQL on Polars](./sql-on-polars.py) -- [Executing SQL on Pandas](./sql-on-pandas.py) +- [Query a Parquet file using SQL](./examples/sql-parquet.py) +- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py) +- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py) +- [Query PyArrow Data](./examples/query-pyarrow-data.py) + +### Running User-Defined Python Code + +- [Register a Python UDF with DataFusion](./examples/python-udf.py) +- [Register a Python UDAF with DataFusion](./examples/python-udaf.py) + +### Substrait Support + +- [Serialize query plans using Substrait](./examples/substrait.py) + +### Executing SQL against DataFrame Libraries (Experimental) + +- [Executing SQL on Polars](./examples/sql-on-polars.py) +- [Executing SQL on Pandas](./examples/sql-on-pandas.py) diff --git a/examples/dataframe-parquet.py b/examples/dataframe-parquet.py index 31a8aa645..0f2e4b824 100644 --- a/examples/dataframe-parquet.py +++ b/examples/dataframe-parquet.py @@ -19,7 +19,7 @@ from datafusion import functions as f ctx = SessionContext() -df = ctx.read_parquet( - "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet" -).aggregate([f.col("passenger_count")], [f.count_star()]) +df = ctx.read_parquet("yellow_tripdata_2021-01.parquet").aggregate( + [f.col("passenger_count")], [f.count_star()] +) df.show() diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py index 8a2d59333..0efd77631 100644 --- a/examples/sql-on-pandas.py +++ b/examples/sql-on-pandas.py @@ -19,8 +19,6 @@ ctx = SessionContext() -ctx.register_parquet( - "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" -) +ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") df = ctx.sql("select passenger_count from taxi") print(df) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py index 0173b68b4..c208114c1 100644 --- a/examples/sql-on-polars.py +++ b/examples/sql-on-polars.py @@ -19,9 +19,7 @@ ctx = SessionContext() -ctx.register_parquet( - "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" -) +ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") df = ctx.sql( "select passenger_count, count(*) from taxi group by passenger_count" ) diff --git a/examples/sql-parquet.py b/examples/sql-parquet.py index 7b2db6f2b..3cc9fbd5a 100644 --- a/examples/sql-parquet.py +++ b/examples/sql-parquet.py @@ -18,9 +18,7 @@ from datafusion import SessionContext ctx = SessionContext() -ctx.register_parquet( - "taxi", "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet" -) +ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") df = ctx.sql( "select passenger_count, count(*) from taxi where passenger_count is not null group by passenger_count order by passenger_count" ) diff --git a/examples/substrait.py b/examples/substrait.py new file mode 100644 index 000000000..c167f7d90 --- /dev/null +++ b/examples/substrait.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion import SessionContext +from datafusion import substrait as ss + + +# Create a DataFusion context +ctx = SessionContext() + +# Register table with context +ctx.register_parquet( + "aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv" +) + +substrait_plan = ss.substrait.serde.serialize_to_plan( + "SELECT * FROM aggregate_test_data", ctx +) +# type(substrait_plan) -> + +# Alternative serialization approaches +# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely +# where they could subsequently be deserialized on the receiving end. +substrait_bytes = ss.substrait.serde.serialize_bytes( + "SELECT * FROM aggregate_test_data", ctx +) + +# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused +# type(substrait_plan) -> +substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes) + +# type(df_logical_plan) -> +df_logical_plan = ss.substrait.consumer.from_substrait_plan( + ctx, substrait_plan +) + +# Back to Substrait Plan just for demonstration purposes +# type(substrait_plan) -> +substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan) From c26f13f8a7a015ed6fb16b4884a361ea27ac260a Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 21 Feb 2023 23:32:24 -0500 Subject: [PATCH 12/13] Analyze table bindings (#204) * method for getting the internal LogicalPlan instance * Add explain plan method * Add bindings for analyze table * Add to_variant * cargo fmt * blake and flake formatting --- datafusion/__init__.py | 8 ++++ datafusion/pandas.py | 1 - datafusion/polars.py | 1 - datafusion/tests/test_context.py | 1 - datafusion/tests/test_imports.py | 11 ++++- src/expr.rs | 2 + src/expr/analyze.rs | 76 ++++++++++++++++++++++++++++++++ src/sql/logical.rs | 22 +++++---- 8 files changed, 110 insertions(+), 12 deletions(-) create mode 100644 src/expr/analyze.rs diff --git a/datafusion/__init__.py b/datafusion/__init__.py index b6cd5178a..46206f062 100644 --- a/datafusion/__init__.py +++ b/datafusion/__init__.py @@ -41,8 +41,12 @@ ) from .expr import ( + Analyze, Expr, + Filter, + Limit, Projection, + Sort, TableScan, ) @@ -63,6 +67,10 @@ "Projection", "DFSchema", "DFField", + "Analyze", + "Sort", + "Limit", + "Filter", ] diff --git a/datafusion/pandas.py b/datafusion/pandas.py index 36e4ba2e0..f8e56512b 100644 --- a/datafusion/pandas.py +++ b/datafusion/pandas.py @@ -30,7 +30,6 @@ def register_parquet(self, name, path): self.datafusion_ctx.register_parquet(name, path) def to_pandas_expr(self, expr): - # get Python wrapper for logical expression expr = expr.to_variant() diff --git a/datafusion/polars.py b/datafusion/polars.py index e29e51156..a1bafbef8 100644 --- a/datafusion/polars.py +++ b/datafusion/polars.py @@ -31,7 +31,6 @@ def register_parquet(self, name, path): self.datafusion_ctx.register_parquet(name, path) def to_polars_expr(self, expr): - # get Python wrapper for logical expression expr = expr.to_variant() diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 6faffaf5b..efa2eded5 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -35,7 +35,6 @@ def test_create_context_no_args(): def test_create_context_with_all_valid_args(): - runtime = ( RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000) ) diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index 40b005b0d..7eb8b7cf7 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -43,6 +43,7 @@ Limit, Aggregate, Sort, + Analyze, ) @@ -68,7 +69,15 @@ def test_class_module_is_datafusion(): assert klass.__module__ == "datafusion.expr" # operators - for klass in [Projection, TableScan, Aggregate, Sort, Limit, Filter]: + for klass in [ + Projection, + TableScan, + Aggregate, + Sort, + Limit, + Filter, + Analyze, + ]: assert klass.__module__ == "datafusion.expr" # schema diff --git a/src/expr.rs b/src/expr.rs index ba01c9927..90ce6bf0e 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -31,6 +31,7 @@ use datafusion::scalar::ScalarValue; pub mod aggregate; pub mod aggregate_expr; +pub mod analyze; pub mod binary_expr; pub mod column; pub mod filter; @@ -183,5 +184,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/expr/analyze.rs b/src/expr/analyze.rs new file mode 100644 index 000000000..095fab037 --- /dev/null +++ b/src/expr/analyze.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::logical_plan::Analyze; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Analyze", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyAnalyze { + analyze: Analyze, +} + +impl PyAnalyze { + pub fn new(analyze: Analyze) -> Self { + Self { analyze } + } +} + +impl From for PyAnalyze { + fn from(analyze: Analyze) -> PyAnalyze { + PyAnalyze { analyze } + } +} + +impl From for Analyze { + fn from(analyze: PyAnalyze) -> Self { + analyze.analyze + } +} + +impl Display for PyAnalyze { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "Analyze Table") + } +} + +#[pymethods] +impl PyAnalyze { + fn verbose(&self) -> PyResult { + Ok(self.analyze.verbose) + } + + /// Resulting Schema for this `Analyze` node instance + fn schema(&self) -> PyResult { + Ok((*self.analyze.schema).clone().into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Analyze({})", self)) + } +} + +impl LogicalNode for PyAnalyze { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.analyze.input).clone())] + } +} diff --git a/src/sql/logical.rs b/src/sql/logical.rs index ce6b1fbfd..ee48f1e17 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use crate::errors::py_runtime_err; use crate::expr::aggregate::PyAggregate; +use crate::expr::analyze::PyAnalyze; use crate::expr::filter::PyFilter; use crate::expr::limit::PyLimit; use crate::expr::projection::PyProjection; @@ -40,6 +41,10 @@ impl PyLogicalPlan { plan: Arc::new(plan), } } + + pub fn plan(&self) -> Arc { + self.plan.clone() + } } #[pymethods] @@ -47,12 +52,13 @@ impl PyLogicalPlan { /// Return the specific logical operator fn to_variant(&self, py: Python) -> PyResult { Python::with_gil(|_| match self.plan.as_ref() { - LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), - LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)), + LogicalPlan::Analyze(plan) => Ok(PyAnalyze::from(plan.clone()).into_py(py)), + LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)), LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)), + LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), LogicalPlan::Sort(plan) => Ok(PySort::from(plan.clone()).into_py(py)), - LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)), + LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), other => Err(py_runtime_err(format!( "Cannot convert this plan to a LogicalNode: {:?}", other @@ -61,7 +67,7 @@ impl PyLogicalPlan { } /// Get the inputs to this plan - pub fn inputs(&self) -> Vec { + fn inputs(&self) -> Vec { let mut inputs = vec![]; for input in self.plan.inputs() { inputs.push(input.to_owned().into()); @@ -73,19 +79,19 @@ impl PyLogicalPlan { Ok(format!("{:?}", self.plan)) } - pub fn display(&self) -> String { + fn display(&self) -> String { format!("{}", self.plan.display()) } - pub fn display_indent(&self) -> String { + fn display_indent(&self) -> String { format!("{}", self.plan.display_indent()) } - pub fn display_indent_schema(&self) -> String { + fn display_indent_schema(&self) -> String { format!("{}", self.plan.display_indent_schema()) } - pub fn display_graphviz(&self) -> String { + fn display_graphviz(&self) -> String { format!("{}", self.plan.display_indent_schema()) } } From 868fd51061e26d5e67f7c74e15bc58f02edc5764 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Feb 2023 05:08:41 -0700 Subject: [PATCH 13/13] changelog (#209) --- CHANGELOG.md | 94 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 63 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d47fdf5a..f19691640 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,50 +19,36 @@ # Changelog -## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-02-17) +## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-02-22) -[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.8.0) +[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.8.0-rc1...0.8.0) **Implemented enhancements:** -- Add bindings for datafusion\_common::DFField [\#184](https://github.com/apache/arrow-datafusion-python/issues/184) -- Add bindings for DFSchema/DFSchemaRef [\#181](https://github.com/apache/arrow-datafusion-python/issues/181) -- Add bindings for datafusion\_expr Projection [\#179](https://github.com/apache/arrow-datafusion-python/issues/179) -- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` [\#177](https://github.com/apache/arrow-datafusion-python/issues/177) -- Add a "mapping" struct for types [\#172](https://github.com/apache/arrow-datafusion-python/issues/172) -- Improve string representation of datafusion classes \(dataframe, context, expression, ...\) [\#158](https://github.com/apache/arrow-datafusion-python/issues/158) -- Add DataFrame count method [\#151](https://github.com/apache/arrow-datafusion-python/issues/151) -- \[REQUEST\] Github Actions Improvements [\#146](https://github.com/apache/arrow-datafusion-python/issues/146) -- Change default branch name from master to main [\#144](https://github.com/apache/arrow-datafusion-python/issues/144) -- Bump pyo3 to 0.18.0 [\#140](https://github.com/apache/arrow-datafusion-python/issues/140) -- Add script for Python linting [\#134](https://github.com/apache/arrow-datafusion-python/issues/134) -- Add Python bindings for substrait module [\#132](https://github.com/apache/arrow-datafusion-python/issues/132) -- Expand unit tests for built-in functions [\#128](https://github.com/apache/arrow-datafusion-python/issues/128) -- support creating arrow-datafusion-python conda environment [\#122](https://github.com/apache/arrow-datafusion-python/issues/122) -- Build Python source distribution in GitHub workflow [\#81](https://github.com/apache/arrow-datafusion-python/issues/81) -- EPIC: Add all functions to python binding `functions` [\#72](https://github.com/apache/arrow-datafusion-python/issues/72) +- Add support for cuDF physical execution engine [\#202](https://github.com/apache/arrow-datafusion-python/issues/202) +- Make it easier to create a Pandas dataframe from DataFusion query results [\#139](https://github.com/apache/arrow-datafusion-python/issues/139) **Fixed bugs:** -- Build is broken [\#161](https://github.com/apache/arrow-datafusion-python/issues/161) -- Out of memory when sorting [\#157](https://github.com/apache/arrow-datafusion-python/issues/157) -- window\_lead test appears to be non-deterministic [\#135](https://github.com/apache/arrow-datafusion-python/issues/135) -- Reading csv does not work [\#130](https://github.com/apache/arrow-datafusion-python/issues/130) -- Github actions produce a lot of warnings [\#94](https://github.com/apache/arrow-datafusion-python/issues/94) -- ASF source release tarball has wrong directory name [\#90](https://github.com/apache/arrow-datafusion-python/issues/90) -- Python Release Build failing after upgrading to maturin 14.2 [\#87](https://github.com/apache/arrow-datafusion-python/issues/87) -- Maturin build hangs on Linux ARM64 [\#84](https://github.com/apache/arrow-datafusion-python/issues/84) -- Cannot install on Mac M1 from source tarball from testpypi [\#82](https://github.com/apache/arrow-datafusion-python/issues/82) -- ImportPathMismatchError when running pytest locally [\#77](https://github.com/apache/arrow-datafusion-python/issues/77) +- Build error: could not compile `thiserror` due to 2 previous errors [\#69](https://github.com/apache/arrow-datafusion-python/issues/69) **Closed issues:** -- Publish documentation for Python bindings [\#39](https://github.com/apache/arrow-datafusion-python/issues/39) -- Add Python binding for `approx_median` [\#32](https://github.com/apache/arrow-datafusion-python/issues/32) -- Release version 0.7.0 [\#7](https://github.com/apache/arrow-datafusion-python/issues/7) +- Integrate with the new `object_store` crate [\#22](https://github.com/apache/arrow-datafusion-python/issues/22) **Merged pull requests:** +- Update README in preparation for 0.8 release [\#206](https://github.com/apache/arrow-datafusion-python/pull/206) ([andygrove](https://github.com/andygrove)) +- Add support for cudf as a physical execution engine [\#205](https://github.com/apache/arrow-datafusion-python/pull/205) ([jdye64](https://github.com/jdye64)) +- Run `maturin develop` instead of `cargo build` in verification script [\#200](https://github.com/apache/arrow-datafusion-python/pull/200) ([andygrove](https://github.com/andygrove)) +- Add tests for recently added functionality [\#199](https://github.com/apache/arrow-datafusion-python/pull/199) ([andygrove](https://github.com/andygrove)) +- Implement `to_pandas()` [\#197](https://github.com/apache/arrow-datafusion-python/pull/197) ([simicd](https://github.com/simicd)) +- Add Python wrapper for LogicalPlan::Sort [\#196](https://github.com/apache/arrow-datafusion-python/pull/196) ([andygrove](https://github.com/andygrove)) +- Add Python wrapper for LogicalPlan::Aggregate [\#195](https://github.com/apache/arrow-datafusion-python/pull/195) ([andygrove](https://github.com/andygrove)) +- Add Python wrapper for LogicalPlan::Limit [\#193](https://github.com/apache/arrow-datafusion-python/pull/193) ([andygrove](https://github.com/andygrove)) +- Add Python wrapper for LogicalPlan::Filter [\#192](https://github.com/apache/arrow-datafusion-python/pull/192) ([andygrove](https://github.com/andygrove)) +- Add experimental support for executing SQL with Polars and Pandas [\#190](https://github.com/apache/arrow-datafusion-python/pull/190) ([andygrove](https://github.com/andygrove)) +- Update changelog for 0.8 release [\#188](https://github.com/apache/arrow-datafusion-python/pull/188) ([andygrove](https://github.com/andygrove)) - Add ability to execute ExecutionPlan and get a stream of RecordBatch [\#186](https://github.com/apache/arrow-datafusion-python/pull/186) ([andygrove](https://github.com/andygrove)) - Dffield bindings [\#185](https://github.com/apache/arrow-datafusion-python/pull/185) ([jdye64](https://github.com/jdye64)) - Add bindings for DFSchema [\#183](https://github.com/apache/arrow-datafusion-python/pull/183) ([jdye64](https://github.com/jdye64)) @@ -118,6 +104,52 @@ - Update release instructions [\#83](https://github.com/apache/arrow-datafusion-python/pull/83) ([andygrove](https://github.com/andygrove)) - \[Functions\] - Add python function binding to `functions` [\#73](https://github.com/apache/arrow-datafusion-python/pull/73) ([francis-du](https://github.com/francis-du)) +## [0.8.0-rc1](https://github.com/apache/arrow-datafusion-python/tree/0.8.0-rc1) (2023-02-17) + +[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0-rc2...0.8.0-rc1) + +**Implemented enhancements:** + +- Add bindings for datafusion\_common::DFField [\#184](https://github.com/apache/arrow-datafusion-python/issues/184) +- Add bindings for DFSchema/DFSchemaRef [\#181](https://github.com/apache/arrow-datafusion-python/issues/181) +- Add bindings for datafusion\_expr Projection [\#179](https://github.com/apache/arrow-datafusion-python/issues/179) +- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` [\#177](https://github.com/apache/arrow-datafusion-python/issues/177) +- Add a "mapping" struct for types [\#172](https://github.com/apache/arrow-datafusion-python/issues/172) +- Improve string representation of datafusion classes \(dataframe, context, expression, ...\) [\#158](https://github.com/apache/arrow-datafusion-python/issues/158) +- Add DataFrame count method [\#151](https://github.com/apache/arrow-datafusion-python/issues/151) +- \[REQUEST\] Github Actions Improvements [\#146](https://github.com/apache/arrow-datafusion-python/issues/146) +- Change default branch name from master to main [\#144](https://github.com/apache/arrow-datafusion-python/issues/144) +- Bump pyo3 to 0.18.0 [\#140](https://github.com/apache/arrow-datafusion-python/issues/140) +- Add script for Python linting [\#134](https://github.com/apache/arrow-datafusion-python/issues/134) +- Add Python bindings for substrait module [\#132](https://github.com/apache/arrow-datafusion-python/issues/132) +- Expand unit tests for built-in functions [\#128](https://github.com/apache/arrow-datafusion-python/issues/128) +- support creating arrow-datafusion-python conda environment [\#122](https://github.com/apache/arrow-datafusion-python/issues/122) +- Build Python source distribution in GitHub workflow [\#81](https://github.com/apache/arrow-datafusion-python/issues/81) +- EPIC: Add all functions to python binding `functions` [\#72](https://github.com/apache/arrow-datafusion-python/issues/72) + +**Fixed bugs:** + +- Build is broken [\#161](https://github.com/apache/arrow-datafusion-python/issues/161) +- Out of memory when sorting [\#157](https://github.com/apache/arrow-datafusion-python/issues/157) +- window\_lead test appears to be non-deterministic [\#135](https://github.com/apache/arrow-datafusion-python/issues/135) +- Reading csv does not work [\#130](https://github.com/apache/arrow-datafusion-python/issues/130) +- Github actions produce a lot of warnings [\#94](https://github.com/apache/arrow-datafusion-python/issues/94) +- ASF source release tarball has wrong directory name [\#90](https://github.com/apache/arrow-datafusion-python/issues/90) +- Python Release Build failing after upgrading to maturin 14.2 [\#87](https://github.com/apache/arrow-datafusion-python/issues/87) +- Maturin build hangs on Linux ARM64 [\#84](https://github.com/apache/arrow-datafusion-python/issues/84) +- Cannot install on Mac M1 from source tarball from testpypi [\#82](https://github.com/apache/arrow-datafusion-python/issues/82) +- ImportPathMismatchError when running pytest locally [\#77](https://github.com/apache/arrow-datafusion-python/issues/77) + +**Closed issues:** + +- Publish documentation for Python bindings [\#39](https://github.com/apache/arrow-datafusion-python/issues/39) +- Add Python binding for `approx_median` [\#32](https://github.com/apache/arrow-datafusion-python/issues/32) +- Release version 0.7.0 [\#7](https://github.com/apache/arrow-datafusion-python/issues/7) + +## [0.7.0-rc2](https://github.com/apache/arrow-datafusion-python/tree/0.7.0-rc2) (2022-11-26) + +[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.7.0-rc2) + ## [Unreleased](https://github.com/datafusion-contrib/datafusion-python/tree/HEAD)