From f9ca067b0b79896858e7d4da5bb0079ba3295ba5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Feb 2023 08:52:38 -0700 Subject: [PATCH 01/14] Add example showing how execute SQL with Polars --- examples/README.md | 11 ++++++ examples/sql-on-polars.py | 71 +++++++++++++++++++++++++++++++++++++++ src/expr/logical_node.rs | 2 ++ src/expr/projection.rs | 10 ++++++ src/expr/table_scan.rs | 19 +++++++++++ src/sql/logical.rs | 19 +++++++++++ 6 files changed, 132 insertions(+) create mode 100644 examples/sql-on-polars.py diff --git a/examples/README.md b/examples/README.md index a3ae0ba42..ded42d167 100644 --- a/examples/README.md +++ b/examples/README.md @@ -19,9 +19,20 @@ # DataFusion Python Examples +Some of the examples rely on data which can be downloaded from the following site: + +- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page + +Here is a direct link to the file used in the examples: + +- https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet + +## Examples + - [Query a Parquet file using SQL](./sql-parquet.py) - [Query a Parquet file using the DataFrame API](./dataframe-parquet.py) - [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py) - [Query PyArrow Data](./query-pyarrow-data.py) - [Register a Python UDF with DataFusion](./python-udf.py) - [Register a Python UDAF with DataFusion](./python-udaf.py) +- [Executing SQL on Polars](./sql-on-polars.py) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py new file mode 100644 index 000000000..0cd7691b2 --- /dev/null +++ b/examples/sql-on-polars.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import polars +from datafusion import SessionContext +from datafusion.expr import Projection, TableScan, Expr +import re + +class SqlOnPolarsContext: + def __init__(self): + self.datafusion_ctx = SessionContext() + self.parquet_tables = {} + + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + + def to_polars_expr(self, expr): + # TODO: need python wrappers for each type of expression + if isinstance(expr, Expr): + str = "{}".format(expr) + x = re.findall("Expr\([_a-z]+\.([_a-z]+)\)", str) + print(x) + return polars.col(x[0]) + + + + def to_polars_df(self, plan): + print("to_polars_df", plan) + + # recurse down first to translate inputs into Polars data frames + inputs = [self.to_polars_df(x) for x in plan.inputs()] + + node = plan.to_logical_node() + + if isinstance(node, Projection): + args = [self.to_polars_expr(expr) for expr in node.projections()] + print(args) + return inputs[0].select(*args) + elif isinstance(node, TableScan): + return polars.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception("unsupported logical operator: {}".format(type(node))) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_polars_df(plan) + + +if __name__ == "__main__": + ctx = SqlOnPolarsContext() + ctx.register_parquet("taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet") + df = ctx.sql("select passenger_count from taxi") + print(df) diff --git a/src/expr/logical_node.rs b/src/expr/logical_node.rs index 1bb3fa75f..a6e6463f2 100644 --- a/src/expr/logical_node.rs +++ b/src/expr/logical_node.rs @@ -20,6 +20,8 @@ use crate::sql::logical::PyLogicalPlan; /// Representation of a `LogicalNode` in the in overall `LogicalPlan` /// any "node" shares these common traits in common. pub trait LogicalNode { + /// Get the name of this logical node, such as "Projection", or "TableScan". + fn name(&self) -> &str; /// The input plan to the current logical node instance. fn input(&self) -> Vec; } diff --git a/src/expr/projection.rs b/src/expr/projection.rs index 6d04e59a8..e0217de6f 100644 --- a/src/expr/projection.rs +++ b/src/expr/projection.rs @@ -32,6 +32,12 @@ pub struct PyProjection { projection: Projection, } +impl PyProjection { + pub fn new(projection: Projection) -> Self { + Self { projection } + } +} + impl From for PyProjection { fn from(projection: Projection) -> PyProjection { PyProjection { projection } @@ -104,6 +110,10 @@ impl PyProjection { } impl LogicalNode for PyProjection { + fn name(&self) -> &str { + "Projection" + } + fn input(&self) -> Vec { vec![PyLogicalPlan::from((*self.projection.input).clone())] } diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs index 00504b97f..59e4e1524 100644 --- a/src/expr/table_scan.rs +++ b/src/expr/table_scan.rs @@ -19,6 +19,8 @@ use datafusion_expr::logical_plan::TableScan; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; +use crate::expr::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; use crate::{common::df_schema::PyDFSchema, expr::PyExpr}; #[pyclass(name = "TableScan", module = "datafusion.expr", subclass)] @@ -27,6 +29,12 @@ pub struct PyTableScan { table_scan: TableScan, } +impl PyTableScan { + pub fn new(table_scan: TableScan) -> Self { + Self { table_scan } + } +} + impl From for TableScan { fn from(tbl_scan: PyTableScan) -> TableScan { tbl_scan.table_scan @@ -117,3 +125,14 @@ impl PyTableScan { Ok(format!("TableScan({})", self)) } } + +impl LogicalNode for PyTableScan { + fn name(&self) -> &str { + "TableScan" + } + + fn input(&self) -> Vec { + // table scans are leaf nodes and do not have inputs + vec![] + } +} diff --git a/src/sql/logical.rs b/src/sql/logical.rs index dcd7baa58..9e00deeb0 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -17,6 +17,9 @@ use std::sync::Arc; +use crate::errors::py_runtime_err; +use crate::expr::projection::PyProjection; +use crate::expr::table_scan::PyTableScan; use datafusion_expr::LogicalPlan; use pyo3::prelude::*; @@ -37,6 +40,18 @@ impl PyLogicalPlan { #[pymethods] impl PyLogicalPlan { + /// Return a Python object representation of this logical operator + fn to_logical_node(&self, py: Python) -> PyResult { + Python::with_gil(|_| match self.plan.as_ref() { + LogicalPlan::Projection(plan) => Ok(PyProjection::new(plan.clone()).into_py(py)), + LogicalPlan::TableScan(plan) => Ok(PyTableScan::new(plan.clone()).into_py(py)), + other => Err(py_runtime_err(format!( + "Cannot convert this plan to a LogicalNode: {:?}", + other + ))), + }) + } + /// Get the inputs to this plan pub fn inputs(&self) -> Vec { let mut inputs = vec![]; @@ -46,6 +61,10 @@ impl PyLogicalPlan { inputs } + fn __repr__(&self) -> PyResult { + Ok(format!("{:?}", self.plan)) + } + pub fn display(&self) -> String { format!("{}", self.plan.display()) } From d70b740979c3396cc9b04665b672f2c8bc5d2831 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Feb 2023 08:54:31 -0700 Subject: [PATCH 02/14] lint --- examples/sql-on-polars.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py index 0cd7691b2..f8bc4333a 100644 --- a/examples/sql-on-polars.py +++ b/examples/sql-on-polars.py @@ -40,7 +40,6 @@ def to_polars_expr(self, expr): return polars.col(x[0]) - def to_polars_df(self, plan): print("to_polars_df", plan) From 569620113a1c1b776ca6d4cc967f5a8a536bddf5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Feb 2023 08:56:22 -0700 Subject: [PATCH 03/14] Remove debug prints --- examples/sql-on-polars.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py index f8bc4333a..94dd0c4de 100644 --- a/examples/sql-on-polars.py +++ b/examples/sql-on-polars.py @@ -36,21 +36,18 @@ def to_polars_expr(self, expr): if isinstance(expr, Expr): str = "{}".format(expr) x = re.findall("Expr\([_a-z]+\.([_a-z]+)\)", str) - print(x) return polars.col(x[0]) def to_polars_df(self, plan): - print("to_polars_df", plan) - # recurse down first to translate inputs into Polars data frames inputs = [self.to_polars_df(x) for x in plan.inputs()] + # get Python wrapper for logical operator node node = plan.to_logical_node() if isinstance(node, Projection): args = [self.to_polars_expr(expr) for expr in node.projections()] - print(args) return inputs[0].select(*args) elif isinstance(node, TableScan): return polars.read_parquet(self.parquet_tables[node.table_name()]) From b0e71a78fbd01c5adda67316ce824d555308dd35 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Feb 2023 09:01:59 -0700 Subject: [PATCH 04/14] Add SQL on Pandas example --- examples/README.md | 1 + examples/sql-on-pandas.py | 67 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 examples/sql-on-pandas.py diff --git a/examples/README.md b/examples/README.md index ded42d167..e73636642 100644 --- a/examples/README.md +++ b/examples/README.md @@ -36,3 +36,4 @@ Here is a direct link to the file used in the examples: - [Register a Python UDF with DataFusion](./python-udf.py) - [Register a Python UDAF with DataFusion](./python-udaf.py) - [Executing SQL on Polars](./sql-on-polars.py) +- [Executing SQL on Pandas](./sql-on-pandas.py) diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py new file mode 100644 index 000000000..e5409261f --- /dev/null +++ b/examples/sql-on-pandas.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pandas as pd +from datafusion import SessionContext +from datafusion.expr import Projection, TableScan, Expr +import re + +class SqlOnPandasContext: + def __init__(self): + self.datafusion_ctx = SessionContext() + self.parquet_tables = {} + + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + + def to_pandas_expr(self, expr): + # TODO: need python wrappers for each type of expression + if isinstance(expr, Expr): + str = "{}".format(expr) + x = re.findall("Expr\([_a-z]+\.([_a-z]+)\)", str) + return x[0] + + + def to_pandas_df(self, plan): + # recurse down first to translate inputs into pandas data frames + inputs = [self.to_pandas_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_logical_node() + + if isinstance(node, Projection): + args = [self.to_pandas_expr(expr) for expr in node.projections()] + return inputs[0][args] + elif isinstance(node, TableScan): + return pd.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception("unsupported logical operator: {}".format(type(node))) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_pandas_df(plan) + + +if __name__ == "__main__": + ctx = SqlOnPandasContext() + ctx.register_parquet("taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet") + df = ctx.sql("select passenger_count from taxi") + print(df) From a97b702b65025d6a7764eea9a9952cb7cb0ff961 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Feb 2023 09:10:17 -0700 Subject: [PATCH 05/14] lint --- examples/sql-on-pandas.py | 4 ++-- examples/sql-on-polars.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py index e5409261f..1754ff660 100644 --- a/examples/sql-on-pandas.py +++ b/examples/sql-on-pandas.py @@ -32,10 +32,10 @@ def register_parquet(self, name, path): def to_pandas_expr(self, expr): - # TODO: need python wrappers for each type of expression + # TODO: need python wrappers for each type of expression - assume column for now if isinstance(expr, Expr): str = "{}".format(expr) - x = re.findall("Expr\([_a-z]+\.([_a-z]+)\)", str) + x = re.findall(r"Expr\([_a-z]+\.([_a-z]+)\)", str) return x[0] diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py index 94dd0c4de..6dd734fa6 100644 --- a/examples/sql-on-polars.py +++ b/examples/sql-on-polars.py @@ -32,10 +32,10 @@ def register_parquet(self, name, path): def to_polars_expr(self, expr): - # TODO: need python wrappers for each type of expression + # TODO: need python wrappers for each type of expression - assume column for now if isinstance(expr, Expr): str = "{}".format(expr) - x = re.findall("Expr\([_a-z]+\.([_a-z]+)\)", str) + x = re.findall(r"Expr\([_a-z]+\.([_a-z]+)\)", str) return polars.col(x[0]) From 2463f6286d1aa5737326961b8db57a319cae8e3d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Feb 2023 09:11:38 -0700 Subject: [PATCH 06/14] lint --- examples/sql-on-pandas.py | 12 +++++++----- examples/sql-on-polars.py | 12 +++++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py index 1754ff660..bf7ba9b09 100644 --- a/examples/sql-on-pandas.py +++ b/examples/sql-on-pandas.py @@ -20,17 +20,16 @@ from datafusion.expr import Projection, TableScan, Expr import re + class SqlOnPandasContext: def __init__(self): self.datafusion_ctx = SessionContext() self.parquet_tables = {} - def register_parquet(self, name, path): self.parquet_tables[name] = path self.datafusion_ctx.register_parquet(name, path) - def to_pandas_expr(self, expr): # TODO: need python wrappers for each type of expression - assume column for now if isinstance(expr, Expr): @@ -38,7 +37,6 @@ def to_pandas_expr(self, expr): x = re.findall(r"Expr\([_a-z]+\.([_a-z]+)\)", str) return x[0] - def to_pandas_df(self, plan): # recurse down first to translate inputs into pandas data frames inputs = [self.to_pandas_df(x) for x in plan.inputs()] @@ -52,7 +50,9 @@ def to_pandas_df(self, plan): elif isinstance(node, TableScan): return pd.read_parquet(self.parquet_tables[node.table_name()]) else: - raise Exception("unsupported logical operator: {}".format(type(node))) + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) def sql(self, sql): datafusion_df = self.datafusion_ctx.sql(sql) @@ -62,6 +62,8 @@ def sql(self, sql): if __name__ == "__main__": ctx = SqlOnPandasContext() - ctx.register_parquet("taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet") + ctx.register_parquet( + "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" + ) df = ctx.sql("select passenger_count from taxi") print(df) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py index 6dd734fa6..413ff91a8 100644 --- a/examples/sql-on-polars.py +++ b/examples/sql-on-polars.py @@ -20,17 +20,16 @@ from datafusion.expr import Projection, TableScan, Expr import re + class SqlOnPolarsContext: def __init__(self): self.datafusion_ctx = SessionContext() self.parquet_tables = {} - def register_parquet(self, name, path): self.parquet_tables[name] = path self.datafusion_ctx.register_parquet(name, path) - def to_polars_expr(self, expr): # TODO: need python wrappers for each type of expression - assume column for now if isinstance(expr, Expr): @@ -38,7 +37,6 @@ def to_polars_expr(self, expr): x = re.findall(r"Expr\([_a-z]+\.([_a-z]+)\)", str) return polars.col(x[0]) - def to_polars_df(self, plan): # recurse down first to translate inputs into Polars data frames inputs = [self.to_polars_df(x) for x in plan.inputs()] @@ -52,7 +50,9 @@ def to_polars_df(self, plan): elif isinstance(node, TableScan): return polars.read_parquet(self.parquet_tables[node.table_name()]) else: - raise Exception("unsupported logical operator: {}".format(type(node))) + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) def sql(self, sql): datafusion_df = self.datafusion_ctx.sql(sql) @@ -62,6 +62,8 @@ def sql(self, sql): if __name__ == "__main__": ctx = SqlOnPolarsContext() - ctx.register_parquet("taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet") + ctx.register_parquet( + "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" + ) df = ctx.sql("select passenger_count from taxi") print(df) From 025b12e5f41c5e0e83c1a7850b41250750eaec1e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Feb 2023 16:55:11 -0700 Subject: [PATCH 07/14] Add Column, Literal, BinaryExpr Python wrappers --- datafusion/tests/test_expr.py | 40 +++++++++++++++++++++++++++++ datafusion/tests/test_imports.py | 5 +++- src/expr.rs | 24 +++++++++++++++++ src/expr/binary_expr.rs | 44 ++++++++++++++++++++++++++++++++ src/expr/column.rs | 44 ++++++++++++++++++++++++++++++++ src/expr/literal.rs | 44 ++++++++++++++++++++++++++++++++ src/sql/logical.rs | 19 ++++++++++++++ 7 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 datafusion/tests/test_expr.py create mode 100644 src/expr/binary_expr.rs create mode 100644 src/expr/column.rs create mode 100644 src/expr/literal.rs diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py new file mode 100644 index 000000000..702d59aa4 --- /dev/null +++ b/datafusion/tests/test_expr.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion import SessionContext +from datafusion.expr import Column, Literal, BinaryExpr, Projection +import pytest + + +@pytest.fixture +def test_ctx(): + ctx = SessionContext() + ctx.register_csv("test", "testing/data/csv/aggregate_test_100.csv") + return ctx + + +def test_logical_plan(test_ctx): + df = test_ctx.sql("select c1, 123, c1 < 123 from test") + plan = df.logical_plan() + + projection = plan.to_logical_node() + assert isinstance(projection, Projection) + + expr = projection.projections() + assert isinstance(expr[0].to_logical_expr(), Column) + assert isinstance(expr[1].to_logical_expr(), Literal) + assert isinstance(expr[2].to_logical_expr(), BinaryExpr) diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index e5d958537..56cdf7081 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -33,6 +33,9 @@ from datafusion.expr import ( Expr, + Column, + Literal, + BinaryExpr, Projection, TableScan, ) @@ -55,7 +58,7 @@ def test_class_module_is_datafusion(): ]: assert klass.__module__ == "datafusion" - for klass in [Expr, Projection, TableScan]: + for klass in [Expr, Column, Literal, BinaryExpr, Projection, TableScan]: assert klass.__module__ == "datafusion.expr" for klass in [DFField, DFSchema]: diff --git a/src/expr.rs b/src/expr.rs index f3695febf..3925910c0 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -22,8 +22,15 @@ use datafusion::arrow::datatypes::DataType; use datafusion::arrow::pyarrow::PyArrowType; use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField}; +use crate::errors::py_runtime_err; +use crate::expr::binary_expr::PyBinaryExpr; +use crate::expr::column::PyColumn; +use crate::expr::literal::PyLiteral; use datafusion::scalar::ScalarValue; +pub mod binary_expr; +pub mod column; +pub mod literal; pub mod logical_node; pub mod projection; pub mod table_scan; @@ -49,6 +56,19 @@ impl From for PyExpr { #[pymethods] impl PyExpr { + /// Return a Python object representation of this logical expression + fn to_logical_expr(&self, py: Python) -> PyResult { + Python::with_gil(|_| match &self.expr { + Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)), + Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_py(py)), + Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_py(py)), + other => Err(py_runtime_err(format!( + "Cannot convert this Expr to a Python object: {:?}", + other + ))), + }) + } + fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr { let expr = match op { CompareOp::Lt => self.expr.clone().lt(other.expr), @@ -143,5 +163,9 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/expr/binary_expr.rs b/src/expr/binary_expr.rs new file mode 100644 index 000000000..7802e7039 --- /dev/null +++ b/src/expr/binary_expr.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::BinaryExpr; +use pyo3::prelude::*; + +#[pyclass(name = "BinaryExpr", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyBinaryExpr { + expr: BinaryExpr, +} + +impl From for BinaryExpr { + fn from(expr: PyBinaryExpr) -> Self { + expr.expr + } +} + +impl From for PyBinaryExpr { + fn from(expr: BinaryExpr) -> PyBinaryExpr { + PyBinaryExpr { expr } + } +} + +#[pymethods] +impl PyBinaryExpr { + fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.expr)) + } +} diff --git a/src/expr/column.rs b/src/expr/column.rs new file mode 100644 index 000000000..4591fc66a --- /dev/null +++ b/src/expr/column.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::Column; +use pyo3::prelude::*; + +#[pyclass(name = "Column", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyColumn { + pub col: Column, +} + +impl PyColumn { + pub fn new(col: Column) -> Self { + Self { col } + } +} + +impl From for PyColumn { + fn from(col: Column) -> PyColumn { + PyColumn { col } + } +} + +#[pymethods] +impl PyColumn { + fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.col)) + } +} diff --git a/src/expr/literal.rs b/src/expr/literal.rs new file mode 100644 index 000000000..46f86bc87 --- /dev/null +++ b/src/expr/literal.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::ScalarValue; +use pyo3::prelude::*; + +#[pyclass(name = "Literal", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyLiteral { + pub value: ScalarValue, +} + +impl From for ScalarValue { + fn from(lit: PyLiteral) -> ScalarValue { + lit.value + } +} + +impl From for PyLiteral { + fn from(value: ScalarValue) -> PyLiteral { + PyLiteral { value } + } +} + +#[pymethods] +impl PyLiteral { + fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.value)) + } +} diff --git a/src/sql/logical.rs b/src/sql/logical.rs index dcd7baa58..3fcc046f7 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -17,6 +17,9 @@ use std::sync::Arc; +use crate::errors::py_runtime_err; +use crate::expr::projection::PyProjection; +use crate::expr::table_scan::PyTableScan; use datafusion_expr::LogicalPlan; use pyo3::prelude::*; @@ -37,6 +40,18 @@ impl PyLogicalPlan { #[pymethods] impl PyLogicalPlan { + /// Return a Python object representation of this logical operator + fn to_logical_node(&self, py: Python) -> PyResult { + Python::with_gil(|_| match self.plan.as_ref() { + LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), + LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), + other => Err(py_runtime_err(format!( + "Cannot convert this plan to a LogicalNode: {:?}", + other + ))), + }) + } + /// Get the inputs to this plan pub fn inputs(&self) -> Vec { let mut inputs = vec![]; @@ -46,6 +61,10 @@ impl PyLogicalPlan { inputs } + fn __repr__(&self) -> PyResult { + Ok(format!("{:?}", self.plan)) + } + pub fn display(&self) -> String { format!("{}", self.plan.display()) } From a523cff01b9c14e264888f1e981e489a154e7f8d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Feb 2023 17:18:51 -0700 Subject: [PATCH 08/14] better tests --- datafusion/tests/test_expr.py | 19 ++++++++++++++++--- src/expr/binary_expr.rs | 13 +++++++++++++ src/expr/column.rs | 20 ++++++++++++++++++-- src/expr/literal.rs | 30 ++++++++++++++++++++++++++++++ 4 files changed, 77 insertions(+), 5 deletions(-) diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py index 702d59aa4..d7bb9ce00 100644 --- a/datafusion/tests/test_expr.py +++ b/datafusion/tests/test_expr.py @@ -35,6 +35,19 @@ def test_logical_plan(test_ctx): assert isinstance(projection, Projection) expr = projection.projections() - assert isinstance(expr[0].to_logical_expr(), Column) - assert isinstance(expr[1].to_logical_expr(), Literal) - assert isinstance(expr[2].to_logical_expr(), BinaryExpr) + + col1 = expr[0].to_logical_expr() + assert isinstance(col1, Column) + assert col1.name() == "c1" + assert col1.qualified_name() == "test.c1" + + col2 = expr[1].to_logical_expr() + assert isinstance(col2, Literal) + assert col2.data_type() == "Int64" + assert col2.value_i64() == 123 + + col3 = expr[2].to_logical_expr() + assert isinstance(col3, BinaryExpr) + assert isinstance(col3.left().to_logical_expr(), Column) + assert col3.op() == "<" + assert isinstance(col3.right().to_logical_expr(), Literal) diff --git a/src/expr/binary_expr.rs b/src/expr/binary_expr.rs index 7802e7039..5f382b770 100644 --- a/src/expr/binary_expr.rs +++ b/src/expr/binary_expr.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::expr::PyExpr; use datafusion_expr::BinaryExpr; use pyo3::prelude::*; @@ -38,6 +39,18 @@ impl From for PyBinaryExpr { #[pymethods] impl PyBinaryExpr { + fn left(&self) -> PyExpr { + self.expr.left.as_ref().clone().into() + } + + fn right(&self) -> PyExpr { + self.expr.right.as_ref().clone().into() + } + + fn op(&self) -> String { + format!("{}", self.expr.op) + } + fn __repr__(&self) -> PyResult { Ok(format!("{}", self.expr)) } diff --git a/src/expr/column.rs b/src/expr/column.rs index 4591fc66a..14f5ad282 100644 --- a/src/expr/column.rs +++ b/src/expr/column.rs @@ -38,7 +38,23 @@ impl From for PyColumn { #[pymethods] impl PyColumn { - fn __repr__(&self) -> PyResult { - Ok(format!("{}", self.col)) + /// Get the column name + fn name(&self) -> String { + self.col.name.clone() + } + + /// Get the column relation + fn relation(&self) -> Option { + self.col.relation.clone() + } + + /// Get the fully-qualified column name + fn qualified_name(&self) -> String { + self.col.flat_name().clone() + } + + /// Get a String representation of this column + fn __repr__(&self) -> String { + self.qualified_name() } } diff --git a/src/expr/literal.rs b/src/expr/literal.rs index 46f86bc87..27674ce6f 100644 --- a/src/expr/literal.rs +++ b/src/expr/literal.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::errors::py_runtime_err; use datafusion_common::ScalarValue; use pyo3::prelude::*; @@ -38,6 +39,35 @@ impl From for PyLiteral { #[pymethods] impl PyLiteral { + /// Get the data type of this literal value + fn data_type(&self) -> String { + format!("{}", self.value.get_datatype()) + } + + fn value_i32(&self) -> PyResult { + if let ScalarValue::Int32(Some(n)) = &self.value { + Ok(*n) + } else { + Err(py_runtime_err("Cannot access value as i32")) + } + } + + fn value_i64(&self) -> PyResult { + if let ScalarValue::Int64(Some(n)) = &self.value { + Ok(*n) + } else { + Err(py_runtime_err("Cannot access value as i64")) + } + } + + fn value_str(&self) -> PyResult { + if let ScalarValue::Utf8(Some(str)) = &self.value { + Ok(str.clone()) + } else { + Err(py_runtime_err("Cannot access value as string")) + } + } + fn __repr__(&self) -> PyResult { Ok(format!("{}", self.value)) } From a2b70b2c0002fce02f52a80174892372f4e338b5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 18 Feb 2023 17:24:45 -0700 Subject: [PATCH 09/14] clippy --- src/expr/column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/expr/column.rs b/src/expr/column.rs index 14f5ad282..16b8bce3c 100644 --- a/src/expr/column.rs +++ b/src/expr/column.rs @@ -50,7 +50,7 @@ impl PyColumn { /// Get the fully-qualified column name fn qualified_name(&self) -> String { - self.col.flat_name().clone() + self.col.flat_name() } /// Get a String representation of this column From 93290e6a607b8bada54b8b73e167eb6d47107036 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 19 Feb 2023 08:07:09 -0700 Subject: [PATCH 10/14] rename method --- datafusion/tests/test_expr.py | 12 ++++++------ src/expr.rs | 4 ++-- src/sql/logical.rs | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py index d7bb9ce00..20370b8e8 100644 --- a/datafusion/tests/test_expr.py +++ b/datafusion/tests/test_expr.py @@ -31,23 +31,23 @@ def test_logical_plan(test_ctx): df = test_ctx.sql("select c1, 123, c1 < 123 from test") plan = df.logical_plan() - projection = plan.to_logical_node() + projection = plan.to_variant() assert isinstance(projection, Projection) expr = projection.projections() - col1 = expr[0].to_logical_expr() + col1 = expr[0].to_variant() assert isinstance(col1, Column) assert col1.name() == "c1" assert col1.qualified_name() == "test.c1" - col2 = expr[1].to_logical_expr() + col2 = expr[1].to_variant() assert isinstance(col2, Literal) assert col2.data_type() == "Int64" assert col2.value_i64() == 123 - col3 = expr[2].to_logical_expr() + col3 = expr[2].to_variant() assert isinstance(col3, BinaryExpr) - assert isinstance(col3.left().to_logical_expr(), Column) + assert isinstance(col3.left().to_variant(), Column) assert col3.op() == "<" - assert isinstance(col3.right().to_logical_expr(), Literal) + assert isinstance(col3.right().to_variant(), Literal) diff --git a/src/expr.rs b/src/expr.rs index 3925910c0..4b5faa191 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -56,8 +56,8 @@ impl From for PyExpr { #[pymethods] impl PyExpr { - /// Return a Python object representation of this logical expression - fn to_logical_expr(&self, py: Python) -> PyResult { + /// Return the specific expression + fn to_variant(&self, py: Python) -> PyResult { Python::with_gil(|_| match &self.expr { Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)), Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_py(py)), diff --git a/src/sql/logical.rs b/src/sql/logical.rs index 3fcc046f7..e4bb7c670 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -40,8 +40,8 @@ impl PyLogicalPlan { #[pymethods] impl PyLogicalPlan { - /// Return a Python object representation of this logical operator - fn to_logical_node(&self, py: Python) -> PyResult { + /// Return the specific logical operator + fn to_variant(&self, py: Python) -> PyResult { Python::with_gil(|_| match self.plan.as_ref() { LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), From 116ffbb555d566b9c7dccde4abf477f4aa7812a2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 19 Feb 2023 16:40:40 -0700 Subject: [PATCH 11/14] Add aggregate expressions --- datafusion/tests/test_expr.py | 23 ++++++++++- src/expr.rs | 16 ++++++-- src/expr/aggregate_expr.rs | 73 +++++++++++++++++++++++++++++++++++ src/sql/logical.rs | 6 +++ 4 files changed, 113 insertions(+), 5 deletions(-) create mode 100644 src/expr/aggregate_expr.rs diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py index 20370b8e8..47213aaa8 100644 --- a/datafusion/tests/test_expr.py +++ b/datafusion/tests/test_expr.py @@ -16,7 +16,8 @@ # under the License. from datafusion import SessionContext -from datafusion.expr import Column, Literal, BinaryExpr, Projection +from datafusion.expr import Column, Literal, BinaryExpr, AggregateFunction +from datafusion.expr import Projection, Aggregate import pytest @@ -51,3 +52,23 @@ def test_logical_plan(test_ctx): assert isinstance(col3.left().to_variant(), Column) assert col3.op() == "<" assert isinstance(col3.right().to_variant(), Literal) + + +def test_aggregate_query(test_ctx): + df = test_ctx.sql("select c1, count(*) from test group by c1") + plan = df.logical_plan() + + projection = plan.to_variant() + assert isinstance(projection, Projection) + + aggregate = projection.input().to_variant() + assert isinstance(aggregate, Aggregate) + + col1 = aggregate.group_by_exprs()[0].to_variant() + assert isinstance(col1, Column) + assert col1.name() == "c1" + assert col1.qualified_name() == "test.c1" + + col2 = aggregate.aggregate_exprs()[0].to_variant() + assert isinstance(col2, AggregateFunction) + diff --git a/src/expr.rs b/src/expr.rs index 0c7d3823c..058509dc5 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -23,12 +23,14 @@ use datafusion::arrow::pyarrow::PyArrowType; use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField}; use crate::errors::py_runtime_err; +use crate::expr::aggregate_expr::PyAggregateFunction; use crate::expr::binary_expr::PyBinaryExpr; use crate::expr::column::PyColumn; use crate::expr::literal::PyLiteral; use datafusion::scalar::ScalarValue; pub mod aggregate; +pub mod aggregate_expr; pub mod binary_expr; pub mod column; pub mod limit; @@ -65,6 +67,9 @@ impl PyExpr { Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)), Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_py(py)), Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_py(py)), + Expr::AggregateFunction(expr) => { + Ok(PyAggregateFunction::from(expr.clone()).into_py(py)) + } other => Err(py_runtime_err(format!( "Cannot convert this Expr to a Python object: {:?}", other @@ -163,13 +168,16 @@ impl PyExpr { /// Initializes the `expr` module to match the pattern of `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { + // expressions m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + // operators m.add_class::()?; m.add_class::()?; - m.add_class::()?; - m.add_class::()?; - m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/expr/aggregate_expr.rs b/src/expr/aggregate_expr.rs new file mode 100644 index 000000000..2056cbbd3 --- /dev/null +++ b/src/expr/aggregate_expr.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::PyExpr; +use datafusion_expr::expr::AggregateFunction; +use pyo3::prelude::*; +use std::fmt::{Display, Formatter}; + +#[pyclass(name = "AggregateFunction", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyAggregateFunction { + aggr: AggregateFunction, +} + +impl From for AggregateFunction { + fn from(aggr: PyAggregateFunction) -> Self { + aggr.aggr + } +} + +impl From for PyAggregateFunction { + fn from(aggr: AggregateFunction) -> PyAggregateFunction { + PyAggregateFunction { aggr } + } +} + +impl Display for PyAggregateFunction { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + let args: Vec = self.aggr.args.iter().map(|expr| expr.to_string()).collect(); + write!(f, "{}({})", self.aggr.fun, args.join(", ")) + } +} + +#[pymethods] +impl PyAggregateFunction { + /// Get the aggregate type, such as "MIN", or "MAX" + fn aggregate_type(&self) -> String { + format!("{}", self.aggr.fun) + } + + /// is this a distinct aggregate such as `COUNT(DISTINCT expr)` + fn is_distinct(&self) -> bool { + self.aggr.distinct + } + + /// Get the arguments to the aggregate function + fn args(&self) -> Vec { + self.aggr + .args + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect() + } + + /// Get a String representation of this column + fn __repr__(&self) -> String { + format!("AggregateFunction({})", self) + } +} diff --git a/src/sql/logical.rs b/src/sql/logical.rs index e4bb7c670..8b4882d44 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -18,7 +18,10 @@ use std::sync::Arc; use crate::errors::py_runtime_err; +use crate::expr::aggregate::PyAggregate; +use crate::expr::limit::PyLimit; use crate::expr::projection::PyProjection; +use crate::expr::sort::PySort; use crate::expr::table_scan::PyTableScan; use datafusion_expr::LogicalPlan; use pyo3::prelude::*; @@ -45,6 +48,9 @@ impl PyLogicalPlan { Python::with_gil(|_| match self.plan.as_ref() { LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), + LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)), + LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)), + LogicalPlan::Sort(plan) => Ok(PySort::from(plan.clone()).into_py(py)), other => Err(py_runtime_err(format!( "Cannot convert this plan to a LogicalNode: {:?}", other From 4eb0864bdf46a73f7c1629bbc3a10a55c1da6b69 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 20 Feb 2023 08:35:50 -0700 Subject: [PATCH 12/14] Refactor --- README.md | 3 ++ datafusion/pandas.py | 62 ++++++++++++++++++++++++++++++++++++++ datafusion/polars.py | 63 +++++++++++++++++++++++++++++++++++++++ examples/sql-on-pandas.py | 59 +++++------------------------------- examples/sql-on-polars.py | 59 +++++------------------------------- 5 files changed, 142 insertions(+), 104 deletions(-) create mode 100644 datafusion/pandas.py create mode 100644 datafusion/polars.py diff --git a/README.md b/README.md index ab89ff6dd..d465ebcd0 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,9 @@ from having to lock the GIL when running those operations. Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions about thread safety and lack of memory leaks. +There is also experimental support for executing SQL against other DataFrame libraries, such as Polars, Pandas, and any +drop-in replacements for Pandas. + Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html). ## Example Usage diff --git a/datafusion/pandas.py b/datafusion/pandas.py new file mode 100644 index 000000000..1bd46e6d9 --- /dev/null +++ b/datafusion/pandas.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pandas as pd +import datafusion +from datafusion.expr import Projection, TableScan, Column + + +class SessionContext: + def __init__(self): + self.datafusion_ctx = datafusion.SessionContext() + self.parquet_tables = {} + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def to_pandas_expr(self, expr): + + # get Python wrapper for logical expression + expr = expr.to_logical_expr() + + if isinstance(expr, Column): + return expr.name() + else: + raise Exception("unsupported expression: {}".format(expr)) + + def to_pandas_df(self, plan): + # recurse down first to translate inputs into pandas data frames + inputs = [self.to_pandas_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_logical_node() + + if isinstance(node, Projection): + args = [self.to_pandas_expr(expr) for expr in node.projections()] + return inputs[0][args] + elif isinstance(node, TableScan): + return pd.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_pandas_df(plan) diff --git a/datafusion/polars.py b/datafusion/polars.py new file mode 100644 index 000000000..6ed3d868f --- /dev/null +++ b/datafusion/polars.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import polars +import datafusion +from datafusion.expr import Projection, TableScan, Aggregate, Column, AggregateFunction + + +class SessionContext: + def __init__(self): + self.datafusion_ctx = datafusion.SessionContext() + self.parquet_tables = {} + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def to_polars_expr(self, expr): + + # get Python wrapper for logical expression + expr = expr.to_variant() + + if isinstance(expr, Column): + return polars.col(expr.name()) + else: + raise Exception("unsupported expression: {}".format(expr)) + + def to_polars_df(self, plan): + # recurse down first to translate inputs into Polars data frames + inputs = [self.to_polars_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_variant() + + if isinstance(node, Projection): + args = [self.to_polars_expr(expr) for expr in node.projections()] + return inputs[0].select(*args) + elif isinstance(node, TableScan): + return polars.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_polars_df(plan) + diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py index 27b45584f..8a2d59333 100644 --- a/examples/sql-on-pandas.py +++ b/examples/sql-on-pandas.py @@ -15,57 +15,12 @@ # specific language governing permissions and limitations # under the License. -import pandas as pd -from datafusion import SessionContext -from datafusion.expr import Projection, TableScan, Column +from datafusion.pandas import SessionContext -class SqlOnPandasContext: - def __init__(self): - self.datafusion_ctx = SessionContext() - self.parquet_tables = {} - - def register_parquet(self, name, path): - self.parquet_tables[name] = path - self.datafusion_ctx.register_parquet(name, path) - - def to_pandas_expr(self, expr): - - # get Python wrapper for logical expression - expr = expr.to_logical_expr() - - if isinstance(expr, Column): - return expr.name() - else: - raise Exception("unsupported expression: {}".format(expr)) - - def to_pandas_df(self, plan): - # recurse down first to translate inputs into pandas data frames - inputs = [self.to_pandas_df(x) for x in plan.inputs()] - - # get Python wrapper for logical operator node - node = plan.to_logical_node() - - if isinstance(node, Projection): - args = [self.to_pandas_expr(expr) for expr in node.projections()] - return inputs[0][args] - elif isinstance(node, TableScan): - return pd.read_parquet(self.parquet_tables[node.table_name()]) - else: - raise Exception( - "unsupported logical operator: {}".format(type(node)) - ) - - def sql(self, sql): - datafusion_df = self.datafusion_ctx.sql(sql) - plan = datafusion_df.logical_plan() - return self.to_pandas_df(plan) - - -if __name__ == "__main__": - ctx = SqlOnPandasContext() - ctx.register_parquet( - "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" - ) - df = ctx.sql("select passenger_count from taxi") - print(df) +ctx = SessionContext() +ctx.register_parquet( + "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" +) +df = ctx.sql("select passenger_count from taxi") +print(df) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py index e65062b37..996e84c34 100644 --- a/examples/sql-on-polars.py +++ b/examples/sql-on-polars.py @@ -15,57 +15,12 @@ # specific language governing permissions and limitations # under the License. -import polars -from datafusion import SessionContext -from datafusion.expr import Projection, TableScan, Column +from datafusion.polars import SessionContext -class SqlOnPolarsContext: - def __init__(self): - self.datafusion_ctx = SessionContext() - self.parquet_tables = {} - - def register_parquet(self, name, path): - self.parquet_tables[name] = path - self.datafusion_ctx.register_parquet(name, path) - - def to_polars_expr(self, expr): - - # get Python wrapper for logical expression - expr = expr.to_logical_expr() - - if isinstance(expr, Column): - return polars.col(expr.name()) - else: - raise Exception("unsupported expression: {}".format(expr)) - - def to_polars_df(self, plan): - # recurse down first to translate inputs into Polars data frames - inputs = [self.to_polars_df(x) for x in plan.inputs()] - - # get Python wrapper for logical operator node - node = plan.to_logical_node() - - if isinstance(node, Projection): - args = [self.to_polars_expr(expr) for expr in node.projections()] - return inputs[0].select(*args) - elif isinstance(node, TableScan): - return polars.read_parquet(self.parquet_tables[node.table_name()]) - else: - raise Exception( - "unsupported logical operator: {}".format(type(node)) - ) - - def sql(self, sql): - datafusion_df = self.datafusion_ctx.sql(sql) - plan = datafusion_df.logical_plan() - return self.to_polars_df(plan) - - -if __name__ == "__main__": - ctx = SqlOnPolarsContext() - ctx.register_parquet( - "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" - ) - df = ctx.sql("select passenger_count from taxi") - print(df) +ctx = SessionContext() +ctx.register_parquet( + "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" +) +df = ctx.sql("select passenger_count from taxi") +print(df) From b4765821ea51368275ef7c97b9ec38116808fe84 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 20 Feb 2023 09:42:58 -0700 Subject: [PATCH 13/14] Polars simple aggregate query support --- datafusion/pandas.py | 4 ++-- datafusion/polars.py | 26 +++++++++++++++++++++++++- examples/sql-on-polars.py | 4 +++- src/expr/aggregate_expr.rs | 2 +- 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/datafusion/pandas.py b/datafusion/pandas.py index 1bd46e6d9..36e4ba2e0 100644 --- a/datafusion/pandas.py +++ b/datafusion/pandas.py @@ -32,7 +32,7 @@ def register_parquet(self, name, path): def to_pandas_expr(self, expr): # get Python wrapper for logical expression - expr = expr.to_logical_expr() + expr = expr.to_variant() if isinstance(expr, Column): return expr.name() @@ -44,7 +44,7 @@ def to_pandas_df(self, plan): inputs = [self.to_pandas_df(x) for x in plan.inputs()] # get Python wrapper for logical operator node - node = plan.to_logical_node() + node = plan.to_variant() if isinstance(node, Projection): args = [self.to_pandas_expr(expr) for expr in node.projections()] diff --git a/datafusion/polars.py b/datafusion/polars.py index a8790dd41..f08334e04 100644 --- a/datafusion/polars.py +++ b/datafusion/polars.py @@ -17,7 +17,8 @@ import polars import datafusion -from datafusion.expr import Projection, TableScan, Column +from datafusion.expr import Projection, TableScan, Aggregate +from datafusion.expr import Column, AggregateFunction class SessionContext: @@ -49,6 +50,29 @@ def to_polars_df(self, plan): if isinstance(node, Projection): args = [self.to_polars_expr(expr) for expr in node.projections()] return inputs[0].select(*args) + elif isinstance(node, Aggregate): + groupby_expr = [ + self.to_polars_expr(expr) for expr in node.group_by_exprs() + ] + aggs = [] + for expr in node.aggregate_exprs(): + expr = expr.to_variant() + if isinstance(expr, AggregateFunction): + if expr.aggregate_type() == "COUNT": + aggs.append(polars.count().alias("{}".format(expr))) + else: + raise Exception( + "Unsupported aggregate function {}".format( + expr.aggregate_type() + ) + ) + else: + raise Exception( + "Unsupported aggregate function {}".format(expr) + ) + df = inputs[0].groupby(groupby_expr).agg(aggs) + print(df) + return df elif isinstance(node, TableScan): return polars.read_parquet(self.parquet_tables[node.table_name()]) else: diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py index 996e84c34..0173b68b4 100644 --- a/examples/sql-on-polars.py +++ b/examples/sql-on-polars.py @@ -22,5 +22,7 @@ ctx.register_parquet( "taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet" ) -df = ctx.sql("select passenger_count from taxi") +df = ctx.sql( + "select passenger_count, count(*) from taxi group by passenger_count" +) print(df) diff --git a/src/expr/aggregate_expr.rs b/src/expr/aggregate_expr.rs index 2056cbbd3..180105180 100644 --- a/src/expr/aggregate_expr.rs +++ b/src/expr/aggregate_expr.rs @@ -68,6 +68,6 @@ impl PyAggregateFunction { /// Get a String representation of this column fn __repr__(&self) -> String { - format!("AggregateFunction({})", self) + format!("{}", self) } } From b3aa6ea54f9b8db2454d6c17a34dd535d4e50987 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 20 Feb 2023 09:43:26 -0700 Subject: [PATCH 14/14] remove print --- datafusion/polars.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/polars.py b/datafusion/polars.py index f08334e04..e29e51156 100644 --- a/datafusion/polars.py +++ b/datafusion/polars.py @@ -71,7 +71,6 @@ def to_polars_df(self, plan): "Unsupported aggregate function {}".format(expr) ) df = inputs[0].groupby(groupby_expr).agg(aggs) - print(df) return df elif isinstance(node, TableScan): return polars.read_parquet(self.parquet_tables[node.table_name()])