diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d47fdf5a..f19691640 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,50 +19,36 @@ # Changelog -## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-02-17) +## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-02-22) -[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.8.0) +[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.8.0-rc1...0.8.0) **Implemented enhancements:** -- Add bindings for datafusion\_common::DFField [\#184](https://github.com/apache/arrow-datafusion-python/issues/184) -- Add bindings for DFSchema/DFSchemaRef [\#181](https://github.com/apache/arrow-datafusion-python/issues/181) -- Add bindings for datafusion\_expr Projection [\#179](https://github.com/apache/arrow-datafusion-python/issues/179) -- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` [\#177](https://github.com/apache/arrow-datafusion-python/issues/177) -- Add a "mapping" struct for types [\#172](https://github.com/apache/arrow-datafusion-python/issues/172) -- Improve string representation of datafusion classes \(dataframe, context, expression, ...\) [\#158](https://github.com/apache/arrow-datafusion-python/issues/158) -- Add DataFrame count method [\#151](https://github.com/apache/arrow-datafusion-python/issues/151) -- \[REQUEST\] Github Actions Improvements [\#146](https://github.com/apache/arrow-datafusion-python/issues/146) -- Change default branch name from master to main [\#144](https://github.com/apache/arrow-datafusion-python/issues/144) -- Bump pyo3 to 0.18.0 [\#140](https://github.com/apache/arrow-datafusion-python/issues/140) -- Add script for Python linting [\#134](https://github.com/apache/arrow-datafusion-python/issues/134) -- Add Python bindings for substrait module [\#132](https://github.com/apache/arrow-datafusion-python/issues/132) -- Expand unit tests for built-in functions [\#128](https://github.com/apache/arrow-datafusion-python/issues/128) -- support creating arrow-datafusion-python conda environment [\#122](https://github.com/apache/arrow-datafusion-python/issues/122) -- Build Python source distribution in GitHub workflow [\#81](https://github.com/apache/arrow-datafusion-python/issues/81) -- EPIC: Add all functions to python binding `functions` [\#72](https://github.com/apache/arrow-datafusion-python/issues/72) +- Add support for cuDF physical execution engine [\#202](https://github.com/apache/arrow-datafusion-python/issues/202) +- Make it easier to create a Pandas dataframe from DataFusion query results [\#139](https://github.com/apache/arrow-datafusion-python/issues/139) **Fixed bugs:** -- Build is broken [\#161](https://github.com/apache/arrow-datafusion-python/issues/161) -- Out of memory when sorting [\#157](https://github.com/apache/arrow-datafusion-python/issues/157) -- window\_lead test appears to be non-deterministic [\#135](https://github.com/apache/arrow-datafusion-python/issues/135) -- Reading csv does not work [\#130](https://github.com/apache/arrow-datafusion-python/issues/130) -- Github actions produce a lot of warnings [\#94](https://github.com/apache/arrow-datafusion-python/issues/94) -- ASF source release tarball has wrong directory name [\#90](https://github.com/apache/arrow-datafusion-python/issues/90) -- Python Release Build failing after upgrading to maturin 14.2 [\#87](https://github.com/apache/arrow-datafusion-python/issues/87) -- Maturin build hangs on Linux ARM64 [\#84](https://github.com/apache/arrow-datafusion-python/issues/84) -- Cannot install on Mac M1 from source tarball from testpypi [\#82](https://github.com/apache/arrow-datafusion-python/issues/82) -- ImportPathMismatchError when running pytest locally [\#77](https://github.com/apache/arrow-datafusion-python/issues/77) +- Build error: could not compile `thiserror` due to 2 previous errors [\#69](https://github.com/apache/arrow-datafusion-python/issues/69) **Closed issues:** -- Publish documentation for Python bindings [\#39](https://github.com/apache/arrow-datafusion-python/issues/39) -- Add Python binding for `approx_median` [\#32](https://github.com/apache/arrow-datafusion-python/issues/32) -- Release version 0.7.0 [\#7](https://github.com/apache/arrow-datafusion-python/issues/7) +- Integrate with the new `object_store` crate [\#22](https://github.com/apache/arrow-datafusion-python/issues/22) **Merged pull requests:** +- Update README in preparation for 0.8 release [\#206](https://github.com/apache/arrow-datafusion-python/pull/206) ([andygrove](https://github.com/andygrove)) +- Add support for cudf as a physical execution engine [\#205](https://github.com/apache/arrow-datafusion-python/pull/205) ([jdye64](https://github.com/jdye64)) +- Run `maturin develop` instead of `cargo build` in verification script [\#200](https://github.com/apache/arrow-datafusion-python/pull/200) ([andygrove](https://github.com/andygrove)) +- Add tests for recently added functionality [\#199](https://github.com/apache/arrow-datafusion-python/pull/199) ([andygrove](https://github.com/andygrove)) +- Implement `to_pandas()` [\#197](https://github.com/apache/arrow-datafusion-python/pull/197) ([simicd](https://github.com/simicd)) +- Add Python wrapper for LogicalPlan::Sort [\#196](https://github.com/apache/arrow-datafusion-python/pull/196) ([andygrove](https://github.com/andygrove)) +- Add Python wrapper for LogicalPlan::Aggregate [\#195](https://github.com/apache/arrow-datafusion-python/pull/195) ([andygrove](https://github.com/andygrove)) +- Add Python wrapper for LogicalPlan::Limit [\#193](https://github.com/apache/arrow-datafusion-python/pull/193) ([andygrove](https://github.com/andygrove)) +- Add Python wrapper for LogicalPlan::Filter [\#192](https://github.com/apache/arrow-datafusion-python/pull/192) ([andygrove](https://github.com/andygrove)) +- Add experimental support for executing SQL with Polars and Pandas [\#190](https://github.com/apache/arrow-datafusion-python/pull/190) ([andygrove](https://github.com/andygrove)) +- Update changelog for 0.8 release [\#188](https://github.com/apache/arrow-datafusion-python/pull/188) ([andygrove](https://github.com/andygrove)) - Add ability to execute ExecutionPlan and get a stream of RecordBatch [\#186](https://github.com/apache/arrow-datafusion-python/pull/186) ([andygrove](https://github.com/andygrove)) - Dffield bindings [\#185](https://github.com/apache/arrow-datafusion-python/pull/185) ([jdye64](https://github.com/jdye64)) - Add bindings for DFSchema [\#183](https://github.com/apache/arrow-datafusion-python/pull/183) ([jdye64](https://github.com/jdye64)) @@ -118,6 +104,52 @@ - Update release instructions [\#83](https://github.com/apache/arrow-datafusion-python/pull/83) ([andygrove](https://github.com/andygrove)) - \[Functions\] - Add python function binding to `functions` [\#73](https://github.com/apache/arrow-datafusion-python/pull/73) ([francis-du](https://github.com/francis-du)) +## [0.8.0-rc1](https://github.com/apache/arrow-datafusion-python/tree/0.8.0-rc1) (2023-02-17) + +[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0-rc2...0.8.0-rc1) + +**Implemented enhancements:** + +- Add bindings for datafusion\_common::DFField [\#184](https://github.com/apache/arrow-datafusion-python/issues/184) +- Add bindings for DFSchema/DFSchemaRef [\#181](https://github.com/apache/arrow-datafusion-python/issues/181) +- Add bindings for datafusion\_expr Projection [\#179](https://github.com/apache/arrow-datafusion-python/issues/179) +- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` [\#177](https://github.com/apache/arrow-datafusion-python/issues/177) +- Add a "mapping" struct for types [\#172](https://github.com/apache/arrow-datafusion-python/issues/172) +- Improve string representation of datafusion classes \(dataframe, context, expression, ...\) [\#158](https://github.com/apache/arrow-datafusion-python/issues/158) +- Add DataFrame count method [\#151](https://github.com/apache/arrow-datafusion-python/issues/151) +- \[REQUEST\] Github Actions Improvements [\#146](https://github.com/apache/arrow-datafusion-python/issues/146) +- Change default branch name from master to main [\#144](https://github.com/apache/arrow-datafusion-python/issues/144) +- Bump pyo3 to 0.18.0 [\#140](https://github.com/apache/arrow-datafusion-python/issues/140) +- Add script for Python linting [\#134](https://github.com/apache/arrow-datafusion-python/issues/134) +- Add Python bindings for substrait module [\#132](https://github.com/apache/arrow-datafusion-python/issues/132) +- Expand unit tests for built-in functions [\#128](https://github.com/apache/arrow-datafusion-python/issues/128) +- support creating arrow-datafusion-python conda environment [\#122](https://github.com/apache/arrow-datafusion-python/issues/122) +- Build Python source distribution in GitHub workflow [\#81](https://github.com/apache/arrow-datafusion-python/issues/81) +- EPIC: Add all functions to python binding `functions` [\#72](https://github.com/apache/arrow-datafusion-python/issues/72) + +**Fixed bugs:** + +- Build is broken [\#161](https://github.com/apache/arrow-datafusion-python/issues/161) +- Out of memory when sorting [\#157](https://github.com/apache/arrow-datafusion-python/issues/157) +- window\_lead test appears to be non-deterministic [\#135](https://github.com/apache/arrow-datafusion-python/issues/135) +- Reading csv does not work [\#130](https://github.com/apache/arrow-datafusion-python/issues/130) +- Github actions produce a lot of warnings [\#94](https://github.com/apache/arrow-datafusion-python/issues/94) +- ASF source release tarball has wrong directory name [\#90](https://github.com/apache/arrow-datafusion-python/issues/90) +- Python Release Build failing after upgrading to maturin 14.2 [\#87](https://github.com/apache/arrow-datafusion-python/issues/87) +- Maturin build hangs on Linux ARM64 [\#84](https://github.com/apache/arrow-datafusion-python/issues/84) +- Cannot install on Mac M1 from source tarball from testpypi [\#82](https://github.com/apache/arrow-datafusion-python/issues/82) +- ImportPathMismatchError when running pytest locally [\#77](https://github.com/apache/arrow-datafusion-python/issues/77) + +**Closed issues:** + +- Publish documentation for Python bindings [\#39](https://github.com/apache/arrow-datafusion-python/issues/39) +- Add Python binding for `approx_median` [\#32](https://github.com/apache/arrow-datafusion-python/issues/32) +- Release version 0.7.0 [\#7](https://github.com/apache/arrow-datafusion-python/issues/7) + +## [0.7.0-rc2](https://github.com/apache/arrow-datafusion-python/tree/0.7.0-rc2) (2022-11-26) + +[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.7.0-rc2) + ## [Unreleased](https://github.com/datafusion-contrib/datafusion-python/tree/HEAD) diff --git a/Cargo.lock b/Cargo.lock index 5059afaac..04a2ea8d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1134,9 +1134,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ "bytes", "fnv", @@ -1385,9 +1385,9 @@ checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" [[package]] name = "libflate" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093" +checksum = "97822bf791bd4d5b403713886a5fbe8bf49520fe78e323b0dc480ca1a03e50b0" dependencies = [ "adler32", "crc32fast", @@ -1396,9 +1396,9 @@ dependencies = [ [[package]] name = "libflate_lz77" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a" +checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf" dependencies = [ "rle-decode-fast", ] @@ -2359,9 +2359,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" dependencies = [ "autocfg", ] @@ -2635,9 +2635,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" dependencies = [ "futures-core", "pin-project-lite", diff --git a/README.md b/README.md index ab89ff6dd..e78f61370 100644 --- a/README.md +++ b/README.md @@ -24,19 +24,30 @@ This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion). -Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV -files, run it in a multi-threaded environment, and obtain the result back in Python. +DataFusion's Python bindings can be used as an end-user tool as well as providing a foundation for building new systems. -It also allows you to use UDFs and UDAFs for complex operations. +## Features -The major advantage of this library over other execution engines is that this library achieves zero-copy between -Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart -from having to lock the GIL when running those operations. +- Execute queries using SQL or DataFrames against CSV, Parquet, and JSON data sources +- Queries are optimized using DataFusion's query optimizer +- Execute user-defined Python code from SQL +- Exchange data with Pandas and other DataFrame libraries that support PyArrow +- Serialize and deserialize query plans in Substrait format +- Experimental support for executing SQL queries against Polars, Pandas and cuDF -Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions -about thread safety and lack of memory leaks. +## Comparison with other projects -Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html). +Here is a comparison with similar projects that may help understand when DataFusion might be suitable and unsuitable +for your needs: + +- [DuckDB](http://www.duckdb.org/) is an open source, in-process analytic database. Like DataFusion, it supports + very fast execution, both from its custom file format and directly from Parquet files. Unlike DataFusion, it is + written in C/C++ and it is primarily used directly by users as a serverless database and query system rather than + as a library for building such database systems. + +- [Polars](http://pola.rs/) is one of the fastest DataFrame libraries at the time of writing. Like DataFusion, it + is also written in Rust and uses the Apache Arrow memory model, but unlike DataFusion it does not provide full SQL + support, nor as many extension points. ## Example Usage @@ -47,12 +58,8 @@ The Parquet file used in this example can be downloaded from the following page: - https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page -See the [examples](examples) directory for more examples. - ```python from datafusion import SessionContext -import pandas as pd -import pyarrow as pa # Create a DataFusion context ctx = SessionContext() @@ -67,17 +74,11 @@ df = ctx.sql("select passenger_count, count(*) " "group by passenger_count " "order by passenger_count") -# collect as list of pyarrow.RecordBatch -results = df.collect() - -# get first batch -batch = results[0] - # convert to Pandas -df = batch.to_pandas() +pandas_df = df.to_pandas() # create a chart -fig = df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure() +fig = pandas_df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure() fig.savefig('chart.png') ``` @@ -85,42 +86,30 @@ This produces the following chart: ![Chart](examples/chart.png) -## Substrait Support +## More Examples -`arrow-datafusion-python` has bindings which allow for serializing a SQL query to substrait protobuf format and deserializing substrait protobuf bytes to a DataFusion `LogicalPlan`, `PyLogicalPlan` in a Python context, which can then be executed. +See [examples](examples/README.md) for more information. -### Example of Serializing/Deserializing Substrait Plans +### Executing Queries with DataFusion -```python -from datafusion import SessionContext -from datafusion import substrait as ss +- [Query a Parquet file using SQL](./examples/sql-parquet.py) +- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py) +- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py) +- [Query PyArrow Data](./examples/query-pyarrow-data.py) -# Create a DataFusion context -ctx = SessionContext() - -# Register table with context -ctx.register_parquet('aggregate_test_data', './testing/data/csv/aggregate_test_100.csv') - -substrait_plan = ss.substrait.serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx) -# type(substrait_plan) -> +### Running User-Defined Python Code -# Alternative serialization approaches -# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely -# where they could subsequently be deserialized on the receiving end. -substrait_bytes = ss.substrait.serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx) +- [Register a Python UDF with DataFusion](./examples/python-udf.py) +- [Register a Python UDAF with DataFusion](./examples/python-udaf.py) -# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused -# type(substrait_plan) -> -substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes) +### Substrait Support -# type(df_logical_plan) -> -df_logical_plan = ss.substrait.consumer.from_substrait_plan(ctx, substrait_plan) +- [Serialize query plans using Substrait](./examples/substrait.py) -# Back to Substrait Plan just for demonstration purposes -# type(substrait_plan) -> -substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan) +### Executing SQL against DataFrame Libraries (Experimental) -``` +- [Executing SQL on Polars](./examples/sql-on-polars.py) +- [Executing SQL on Pandas](./examples/sql-on-pandas.py) ## How to install (from pip) diff --git a/conda/environments/datafusion-dev.yaml b/conda/environments/datafusion-dev.yaml index 0e17e1699..d9405e4fe 100644 --- a/conda/environments/datafusion-dev.yaml +++ b/conda/environments/datafusion-dev.yaml @@ -28,7 +28,7 @@ dependencies: - pytest - toml - importlib_metadata -- python>=3.7,<3.11 +- python>=3.10 # Packages useful for building distributions and releasing - mamba - conda-build @@ -38,4 +38,7 @@ dependencies: - pydata-sphinx-theme==0.8.0 - myst-parser - jinja2 +# GPU packages +- cudf +- cudatoolkit=11.8 name: datafusion-dev diff --git a/datafusion/__init__.py b/datafusion/__init__.py index b6cd5178a..46206f062 100644 --- a/datafusion/__init__.py +++ b/datafusion/__init__.py @@ -41,8 +41,12 @@ ) from .expr import ( + Analyze, Expr, + Filter, + Limit, Projection, + Sort, TableScan, ) @@ -63,6 +67,10 @@ "Projection", "DFSchema", "DFField", + "Analyze", + "Sort", + "Limit", + "Filter", ] diff --git a/datafusion/cudf.py b/datafusion/cudf.py new file mode 100644 index 000000000..c38819c62 --- /dev/null +++ b/datafusion/cudf.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import cudf +import datafusion +from datafusion.expr import Projection, TableScan, Column + + +class SessionContext: + def __init__(self): + self.datafusion_ctx = datafusion.SessionContext() + self.parquet_tables = {} + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def to_cudf_expr(self, expr): + + # get Python wrapper for logical expression + expr = expr.to_variant() + + if isinstance(expr, Column): + return expr.name() + else: + raise Exception("unsupported expression: {}".format(expr)) + + def to_cudf_df(self, plan): + # recurse down first to translate inputs into pandas data frames + inputs = [self.to_cudf_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_variant() + + if isinstance(node, Projection): + args = [self.to_cudf_expr(expr) for expr in node.projections()] + return inputs[0][args] + elif isinstance(node, TableScan): + return cudf.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_cudf_df(plan) diff --git a/datafusion/pandas.py b/datafusion/pandas.py new file mode 100644 index 000000000..f8e56512b --- /dev/null +++ b/datafusion/pandas.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pandas as pd +import datafusion +from datafusion.expr import Projection, TableScan, Column + + +class SessionContext: + def __init__(self): + self.datafusion_ctx = datafusion.SessionContext() + self.parquet_tables = {} + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def to_pandas_expr(self, expr): + # get Python wrapper for logical expression + expr = expr.to_variant() + + if isinstance(expr, Column): + return expr.name() + else: + raise Exception("unsupported expression: {}".format(expr)) + + def to_pandas_df(self, plan): + # recurse down first to translate inputs into pandas data frames + inputs = [self.to_pandas_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_variant() + + if isinstance(node, Projection): + args = [self.to_pandas_expr(expr) for expr in node.projections()] + return inputs[0][args] + elif isinstance(node, TableScan): + return pd.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_pandas_df(plan) diff --git a/datafusion/polars.py b/datafusion/polars.py new file mode 100644 index 000000000..a1bafbef8 --- /dev/null +++ b/datafusion/polars.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import polars +import datafusion +from datafusion.expr import Projection, TableScan, Aggregate +from datafusion.expr import Column, AggregateFunction + + +class SessionContext: + def __init__(self): + self.datafusion_ctx = datafusion.SessionContext() + self.parquet_tables = {} + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def to_polars_expr(self, expr): + # get Python wrapper for logical expression + expr = expr.to_variant() + + if isinstance(expr, Column): + return polars.col(expr.name()) + else: + raise Exception("unsupported expression: {}".format(expr)) + + def to_polars_df(self, plan): + # recurse down first to translate inputs into Polars data frames + inputs = [self.to_polars_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_variant() + + if isinstance(node, Projection): + args = [self.to_polars_expr(expr) for expr in node.projections()] + return inputs[0].select(*args) + elif isinstance(node, Aggregate): + groupby_expr = [ + self.to_polars_expr(expr) for expr in node.group_by_exprs() + ] + aggs = [] + for expr in node.aggregate_exprs(): + expr = expr.to_variant() + if isinstance(expr, AggregateFunction): + if expr.aggregate_type() == "COUNT": + aggs.append(polars.count().alias("{}".format(expr))) + else: + raise Exception( + "Unsupported aggregate function {}".format( + expr.aggregate_type() + ) + ) + else: + raise Exception( + "Unsupported aggregate function {}".format(expr) + ) + df = inputs[0].groupby(groupby_expr).agg(aggs) + return df + elif isinstance(node, TableScan): + return polars.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_polars_df(plan) diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 6faffaf5b..efa2eded5 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -35,7 +35,6 @@ def test_create_context_no_args(): def test_create_context_with_all_valid_args(): - runtime = ( RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000) ) diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index 18946888f..292a4b00c 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -533,3 +533,14 @@ def test_cache(df): def test_count(df): # Get number of rows assert df.count() == 3 + + +def test_to_pandas(df): + # Skip test if pandas is not installed + pd = pytest.importorskip("pandas") + + # Convert datafusion dataframe to pandas dataframe + pandas_df = df.to_pandas() + assert type(pandas_df) == pd.DataFrame + assert pandas_df.shape == (3, 3) + assert set(pandas_df.columns) == {"a", "b", "c"} diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py new file mode 100644 index 000000000..143eea6ff --- /dev/null +++ b/datafusion/tests/test_expr.py @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion import SessionContext +from datafusion.expr import Column, Literal, BinaryExpr, AggregateFunction +from datafusion.expr import ( + Projection, + Filter, + Aggregate, + Limit, + Sort, + TableScan, +) +import pytest + + +@pytest.fixture +def test_ctx(): + ctx = SessionContext() + ctx.register_csv("test", "testing/data/csv/aggregate_test_100.csv") + return ctx + + +def test_projection(test_ctx): + df = test_ctx.sql("select c1, 123, c1 < 123 from test") + plan = df.logical_plan() + + plan = plan.to_variant() + assert isinstance(plan, Projection) + + expr = plan.projections() + + col1 = expr[0].to_variant() + assert isinstance(col1, Column) + assert col1.name() == "c1" + assert col1.qualified_name() == "test.c1" + + col2 = expr[1].to_variant() + assert isinstance(col2, Literal) + assert col2.data_type() == "Int64" + assert col2.value_i64() == 123 + + col3 = expr[2].to_variant() + assert isinstance(col3, BinaryExpr) + assert isinstance(col3.left().to_variant(), Column) + assert col3.op() == "<" + assert isinstance(col3.right().to_variant(), Literal) + + plan = plan.input().to_variant() + assert isinstance(plan, TableScan) + + +def test_filter(test_ctx): + df = test_ctx.sql("select c1 from test WHERE c1 > 5") + plan = df.logical_plan() + + plan = plan.to_variant() + assert isinstance(plan, Projection) + + plan = plan.input().to_variant() + assert isinstance(plan, Filter) + + +def test_limit(test_ctx): + df = test_ctx.sql("select c1 from test LIMIT 10") + plan = df.logical_plan() + + plan = plan.to_variant() + assert isinstance(plan, Limit) + + +def test_aggregate_query(test_ctx): + df = test_ctx.sql("select c1, count(*) from test group by c1") + plan = df.logical_plan() + + projection = plan.to_variant() + assert isinstance(projection, Projection) + + aggregate = projection.input().to_variant() + assert isinstance(aggregate, Aggregate) + + col1 = aggregate.group_by_exprs()[0].to_variant() + assert isinstance(col1, Column) + assert col1.name() == "c1" + assert col1.qualified_name() == "test.c1" + + col2 = aggregate.aggregate_exprs()[0].to_variant() + assert isinstance(col2, AggregateFunction) + + +def test_sort(test_ctx): + df = test_ctx.sql("select c1 from test order by c1") + plan = df.logical_plan() + + plan = plan.to_variant() + assert isinstance(plan, Sort) diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index e5d958537..7eb8b7cf7 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -33,8 +33,17 @@ from datafusion.expr import ( Expr, + Column, + Literal, + BinaryExpr, + AggregateFunction, Projection, TableScan, + Filter, + Limit, + Aggregate, + Sort, + Analyze, ) @@ -55,9 +64,23 @@ def test_class_module_is_datafusion(): ]: assert klass.__module__ == "datafusion" - for klass in [Expr, Projection, TableScan]: + # expressions + for klass in [Expr, Column, Literal, BinaryExpr, AggregateFunction]: assert klass.__module__ == "datafusion.expr" + # operators + for klass in [ + Projection, + TableScan, + Aggregate, + Sort, + Limit, + Filter, + Analyze, + ]: + assert klass.__module__ == "datafusion.expr" + + # schema for klass in [DFField, DFSchema]: assert klass.__module__ == "datafusion.common" diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh index fee276c11..be86f69e0 100755 --- a/dev/release/verify-release-candidate.sh +++ b/dev/release/verify-release-candidate.sh @@ -125,15 +125,19 @@ test_source_distribution() { git clone https://github.com/apache/arrow-testing.git testing git clone https://github.com/apache/parquet-testing.git parquet-testing - cargo build - cargo test --all + python3 -m venv venv + source venv/bin/activate + python3 -m pip install -U pip + python3 -m pip install -r requirements-310.txt + maturin develop + + #TODO: we should really run tests here as well + #python3 -m pytest if ( find -iname 'Cargo.toml' | xargs grep SNAPSHOT ); then echo "Cargo.toml version should not contain SNAPSHOT for releases" exit 1 fi - - cargo publish --dry-run } TEST_SUCCESS=no diff --git a/examples/README.md b/examples/README.md index a3ae0ba42..ce98600fe 100644 --- a/examples/README.md +++ b/examples/README.md @@ -19,9 +19,31 @@ # DataFusion Python Examples -- [Query a Parquet file using SQL](./sql-parquet.py) -- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py) -- [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py) -- [Query PyArrow Data](./query-pyarrow-data.py) -- [Register a Python UDF with DataFusion](./python-udf.py) -- [Register a Python UDAF with DataFusion](./python-udaf.py) +Some examples rely on data which can be downloaded from the following site: + +- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page + +Here is a direct link to the file used in the examples: + +- https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet + +### Executing Queries with DataFusion + +- [Query a Parquet file using SQL](./examples/sql-parquet.py) +- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py) +- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py) +- [Query PyArrow Data](./examples/query-pyarrow-data.py) + +### Running User-Defined Python Code + +- [Register a Python UDF with DataFusion](./examples/python-udf.py) +- [Register a Python UDAF with DataFusion](./examples/python-udaf.py) + +### Substrait Support + +- [Serialize query plans using Substrait](./examples/substrait.py) + +### Executing SQL against DataFrame Libraries (Experimental) + +- [Executing SQL on Polars](./examples/sql-on-polars.py) +- [Executing SQL on Pandas](./examples/sql-on-pandas.py) diff --git a/examples/dataframe-parquet.py b/examples/dataframe-parquet.py index 31a8aa645..0f2e4b824 100644 --- a/examples/dataframe-parquet.py +++ b/examples/dataframe-parquet.py @@ -19,7 +19,7 @@ from datafusion import functions as f ctx = SessionContext() -df = ctx.read_parquet( - "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet" -).aggregate([f.col("passenger_count")], [f.count_star()]) +df = ctx.read_parquet("yellow_tripdata_2021-01.parquet").aggregate( + [f.col("passenger_count")], [f.count_star()] +) df.show() diff --git a/examples/sql-on-cudf.py b/examples/sql-on-cudf.py new file mode 100644 index 000000000..407cb1f00 --- /dev/null +++ b/examples/sql-on-cudf.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion.cudf import SessionContext + + +ctx = SessionContext() +ctx.register_parquet( + "taxi", "/home/jeremy/Downloads/yellow_tripdata_2021-01.parquet" +) +df = ctx.sql("select passenger_count from taxi") +print(df) diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py new file mode 100644 index 000000000..0efd77631 --- /dev/null +++ b/examples/sql-on-pandas.py @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion.pandas import SessionContext + + +ctx = SessionContext() +ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") +df = ctx.sql("select passenger_count from taxi") +print(df) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py new file mode 100644 index 000000000..c208114c1 --- /dev/null +++ b/examples/sql-on-polars.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion.polars import SessionContext + + +ctx = SessionContext() +ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") +df = ctx.sql( + "select passenger_count, count(*) from taxi group by passenger_count" +) +print(df) diff --git a/examples/sql-parquet.py b/examples/sql-parquet.py index 7b2db6f2b..3cc9fbd5a 100644 --- a/examples/sql-parquet.py +++ b/examples/sql-parquet.py @@ -18,9 +18,7 @@ from datafusion import SessionContext ctx = SessionContext() -ctx.register_parquet( - "taxi", "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet" -) +ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") df = ctx.sql( "select passenger_count, count(*) from taxi where passenger_count is not null group by passenger_count order by passenger_count" ) diff --git a/examples/sql-to-pandas.py b/examples/sql-to-pandas.py index 3569e6d8c..3e99b22de 100644 --- a/examples/sql-to-pandas.py +++ b/examples/sql-to-pandas.py @@ -33,17 +33,11 @@ "order by passenger_count" ) -# collect as list of pyarrow.RecordBatch -results = df.collect() - -# get first batch -batch = results[0] - # convert to Pandas -df = batch.to_pandas() +pandas_df = df.to_pandas() # create a chart -fig = df.plot( +fig = pandas_df.plot( kind="bar", title="Trip Count by Number of Passengers" ).get_figure() fig.savefig("chart.png") diff --git a/examples/substrait.py b/examples/substrait.py new file mode 100644 index 000000000..c167f7d90 --- /dev/null +++ b/examples/substrait.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion import SessionContext +from datafusion import substrait as ss + + +# Create a DataFusion context +ctx = SessionContext() + +# Register table with context +ctx.register_parquet( + "aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv" +) + +substrait_plan = ss.substrait.serde.serialize_to_plan( + "SELECT * FROM aggregate_test_data", ctx +) +# type(substrait_plan) -> + +# Alternative serialization approaches +# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely +# where they could subsequently be deserialized on the receiving end. +substrait_bytes = ss.substrait.serde.serialize_bytes( + "SELECT * FROM aggregate_test_data", ctx +) + +# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused +# type(substrait_plan) -> +substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes) + +# type(df_logical_plan) -> +df_logical_plan = ss.substrait.consumer.from_substrait_plan( + ctx, substrait_plan +) + +# Back to Substrait Plan just for demonstration purposes +# type(substrait_plan) -> +substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan) diff --git a/src/dataframe.rs b/src/dataframe.rs index 4b9fbca6c..a1c68dd1c 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -313,6 +313,24 @@ impl PyDataFrame { Ok(()) } + /// Convert to pandas dataframe with pyarrow + /// Collect the batches, pass to Arrow Table & then convert to Pandas DataFrame + fn to_pandas(&self, py: Python) -> PyResult { + let batches = self.collect(py); + + Python::with_gil(|py| { + // Instantiate pyarrow Table object and use its from_batches method + let table_class = py.import("pyarrow")?.getattr("Table")?; + let args = PyTuple::new(py, batches); + let table: PyObject = table_class.call_method1("from_batches", args)?.into(); + + // Use Table.to_pandas() method to convert batches to pandas dataframe + // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas + let result = table.call_method0(py, "to_pandas")?; + Ok(result) + }) + } + // Executes this DataFrame to get the total number of rows. fn count(&self, py: Python) -> PyResult { Ok(wait_for_future(py, self.df.as_ref().clone().count())?) diff --git a/src/expr.rs b/src/expr.rs index f3695febf..90ce6bf0e 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -22,10 +22,24 @@ use datafusion::arrow::datatypes::DataType; use datafusion::arrow::pyarrow::PyArrowType; use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField}; +use crate::errors::py_runtime_err; +use crate::expr::aggregate_expr::PyAggregateFunction; +use crate::expr::binary_expr::PyBinaryExpr; +use crate::expr::column::PyColumn; +use crate::expr::literal::PyLiteral; use datafusion::scalar::ScalarValue; +pub mod aggregate; +pub mod aggregate_expr; +pub mod analyze; +pub mod binary_expr; +pub mod column; +pub mod filter; +pub mod limit; +pub mod literal; pub mod logical_node; pub mod projection; +pub mod sort; pub mod table_scan; /// A PyExpr that can be used on a DataFrame @@ -49,6 +63,22 @@ impl From for PyExpr { #[pymethods] impl PyExpr { + /// Return the specific expression + fn to_variant(&self, py: Python) -> PyResult { + Python::with_gil(|_| match &self.expr { + Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)), + Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_py(py)), + Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_py(py)), + Expr::AggregateFunction(expr) => { + Ok(PyAggregateFunction::from(expr.clone()).into_py(py)) + } + other => Err(py_runtime_err(format!( + "Cannot convert this Expr to a Python object: {:?}", + other + ))), + }) + } + fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr { let expr = match op { CompareOp::Lt => self.expr.clone().lt(other.expr), @@ -140,8 +170,20 @@ impl PyExpr { /// Initializes the `expr` module to match the pattern of `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { + // expressions m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + // operators m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/expr/aggregate.rs b/src/expr/aggregate.rs new file mode 100644 index 000000000..98d1f554b --- /dev/null +++ b/src/expr/aggregate.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::DataFusionError; +use datafusion_expr::logical_plan::Aggregate; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::expr::PyExpr; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Aggregate", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyAggregate { + aggregate: Aggregate, +} + +impl From for PyAggregate { + fn from(aggregate: Aggregate) -> PyAggregate { + PyAggregate { aggregate } + } +} + +impl TryFrom for Aggregate { + type Error = DataFusionError; + + fn try_from(agg: PyAggregate) -> Result { + Ok(agg.aggregate) + } +} + +impl Display for PyAggregate { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Aggregate + \nGroupBy(s): {:?} + \nAggregates(s): {:?} + \nInput: {:?} + \nProjected Schema: {:?}", + &self.aggregate.group_expr, + &self.aggregate.aggr_expr, + self.aggregate.input, + self.aggregate.schema + ) + } +} + +#[pymethods] +impl PyAggregate { + /// Retrieves the grouping expressions for this `Aggregate` + fn group_by_exprs(&self) -> PyResult> { + Ok(self + .aggregate + .group_expr + .iter() + .map(|e| PyExpr::from(e.clone())) + .collect()) + } + + /// Retrieves the aggregate expressions for this `Aggregate` + fn aggregate_exprs(&self) -> PyResult> { + Ok(self + .aggregate + .aggr_expr + .iter() + .map(|e| PyExpr::from(e.clone())) + .collect()) + } + + // Retrieves the input `LogicalPlan` to this `Aggregate` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.aggregate.input).clone()) + } + + // Resulting Schema for this `Aggregate` node instance + fn schema(&self) -> PyDFSchema { + (*self.aggregate.schema).clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Aggregate({})", self)) + } +} + +impl LogicalNode for PyAggregate { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.aggregate.input).clone())] + } +} diff --git a/src/expr/aggregate_expr.rs b/src/expr/aggregate_expr.rs new file mode 100644 index 000000000..180105180 --- /dev/null +++ b/src/expr/aggregate_expr.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::PyExpr; +use datafusion_expr::expr::AggregateFunction; +use pyo3::prelude::*; +use std::fmt::{Display, Formatter}; + +#[pyclass(name = "AggregateFunction", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyAggregateFunction { + aggr: AggregateFunction, +} + +impl From for AggregateFunction { + fn from(aggr: PyAggregateFunction) -> Self { + aggr.aggr + } +} + +impl From for PyAggregateFunction { + fn from(aggr: AggregateFunction) -> PyAggregateFunction { + PyAggregateFunction { aggr } + } +} + +impl Display for PyAggregateFunction { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + let args: Vec = self.aggr.args.iter().map(|expr| expr.to_string()).collect(); + write!(f, "{}({})", self.aggr.fun, args.join(", ")) + } +} + +#[pymethods] +impl PyAggregateFunction { + /// Get the aggregate type, such as "MIN", or "MAX" + fn aggregate_type(&self) -> String { + format!("{}", self.aggr.fun) + } + + /// is this a distinct aggregate such as `COUNT(DISTINCT expr)` + fn is_distinct(&self) -> bool { + self.aggr.distinct + } + + /// Get the arguments to the aggregate function + fn args(&self) -> Vec { + self.aggr + .args + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect() + } + + /// Get a String representation of this column + fn __repr__(&self) -> String { + format!("{}", self) + } +} diff --git a/src/expr/analyze.rs b/src/expr/analyze.rs new file mode 100644 index 000000000..095fab037 --- /dev/null +++ b/src/expr/analyze.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::logical_plan::Analyze; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Analyze", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyAnalyze { + analyze: Analyze, +} + +impl PyAnalyze { + pub fn new(analyze: Analyze) -> Self { + Self { analyze } + } +} + +impl From for PyAnalyze { + fn from(analyze: Analyze) -> PyAnalyze { + PyAnalyze { analyze } + } +} + +impl From for Analyze { + fn from(analyze: PyAnalyze) -> Self { + analyze.analyze + } +} + +impl Display for PyAnalyze { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "Analyze Table") + } +} + +#[pymethods] +impl PyAnalyze { + fn verbose(&self) -> PyResult { + Ok(self.analyze.verbose) + } + + /// Resulting Schema for this `Analyze` node instance + fn schema(&self) -> PyResult { + Ok((*self.analyze.schema).clone().into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Analyze({})", self)) + } +} + +impl LogicalNode for PyAnalyze { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.analyze.input).clone())] + } +} diff --git a/src/expr/binary_expr.rs b/src/expr/binary_expr.rs new file mode 100644 index 000000000..5f382b770 --- /dev/null +++ b/src/expr/binary_expr.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::PyExpr; +use datafusion_expr::BinaryExpr; +use pyo3::prelude::*; + +#[pyclass(name = "BinaryExpr", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyBinaryExpr { + expr: BinaryExpr, +} + +impl From for BinaryExpr { + fn from(expr: PyBinaryExpr) -> Self { + expr.expr + } +} + +impl From for PyBinaryExpr { + fn from(expr: BinaryExpr) -> PyBinaryExpr { + PyBinaryExpr { expr } + } +} + +#[pymethods] +impl PyBinaryExpr { + fn left(&self) -> PyExpr { + self.expr.left.as_ref().clone().into() + } + + fn right(&self) -> PyExpr { + self.expr.right.as_ref().clone().into() + } + + fn op(&self) -> String { + format!("{}", self.expr.op) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.expr)) + } +} diff --git a/src/expr/column.rs b/src/expr/column.rs new file mode 100644 index 000000000..16b8bce3c --- /dev/null +++ b/src/expr/column.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::Column; +use pyo3::prelude::*; + +#[pyclass(name = "Column", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyColumn { + pub col: Column, +} + +impl PyColumn { + pub fn new(col: Column) -> Self { + Self { col } + } +} + +impl From for PyColumn { + fn from(col: Column) -> PyColumn { + PyColumn { col } + } +} + +#[pymethods] +impl PyColumn { + /// Get the column name + fn name(&self) -> String { + self.col.name.clone() + } + + /// Get the column relation + fn relation(&self) -> Option { + self.col.relation.clone() + } + + /// Get the fully-qualified column name + fn qualified_name(&self) -> String { + self.col.flat_name() + } + + /// Get a String representation of this column + fn __repr__(&self) -> String { + self.qualified_name() + } +} diff --git a/src/expr/filter.rs b/src/expr/filter.rs new file mode 100644 index 000000000..b7b48b9d2 --- /dev/null +++ b/src/expr/filter.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::logical_plan::Filter; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::expr::PyExpr; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Filter", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyFilter { + filter: Filter, +} + +impl From for PyFilter { + fn from(filter: Filter) -> PyFilter { + PyFilter { filter } + } +} + +impl From for Filter { + fn from(filter: PyFilter) -> Self { + filter.filter + } +} + +impl Display for PyFilter { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Filter + \nPredicate: {:?} + \nInput: {:?}", + &self.filter.predicate, &self.filter.input + ) + } +} + +#[pymethods] +impl PyFilter { + /// Retrieves the predicate expression for this `Filter` + fn predicate(&self) -> PyExpr { + PyExpr::from(self.filter.predicate.clone()) + } + + /// Retrieves the input `LogicalPlan` to this `Filter` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.filter.input).clone()) + } + + /// Resulting Schema for this `Filter` node instance + fn schema(&self) -> PyDFSchema { + self.filter.input.schema().as_ref().clone().into() + } + + fn __repr__(&self) -> String { + format!("Filter({})", self) + } +} + +impl LogicalNode for PyFilter { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.filter.input).clone())] + } +} diff --git a/src/expr/limit.rs b/src/expr/limit.rs new file mode 100644 index 000000000..a50e5b8aa --- /dev/null +++ b/src/expr/limit.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::logical_plan::Limit; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Limit", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyLimit { + limit: Limit, +} + +impl From for PyLimit { + fn from(limit: Limit) -> PyLimit { + PyLimit { limit } + } +} + +impl From for Limit { + fn from(limit: PyLimit) -> Self { + limit.limit + } +} + +impl Display for PyLimit { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Limit + \nSkip: {} + \nFetch: {:?} + \nInput: {:?}", + &self.limit.skip, &self.limit.fetch, &self.limit.input + ) + } +} + +#[pymethods] +impl PyLimit { + /// Retrieves the skip value for this `Limit` + fn skip(&self) -> usize { + self.limit.skip + } + + /// Retrieves the fetch value for this `Limit` + fn fetch(&self) -> Option { + self.limit.fetch + } + + /// Retrieves the input `LogicalPlan` to this `Limit` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.limit.input).clone()) + } + + /// Resulting Schema for this `Limit` node instance + fn schema(&self) -> PyResult { + Ok(self.limit.input.schema().as_ref().clone().into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Limit({})", self)) + } +} + +impl LogicalNode for PyLimit { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.limit.input).clone())] + } +} diff --git a/src/expr/literal.rs b/src/expr/literal.rs new file mode 100644 index 000000000..27674ce6f --- /dev/null +++ b/src/expr/literal.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::py_runtime_err; +use datafusion_common::ScalarValue; +use pyo3::prelude::*; + +#[pyclass(name = "Literal", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyLiteral { + pub value: ScalarValue, +} + +impl From for ScalarValue { + fn from(lit: PyLiteral) -> ScalarValue { + lit.value + } +} + +impl From for PyLiteral { + fn from(value: ScalarValue) -> PyLiteral { + PyLiteral { value } + } +} + +#[pymethods] +impl PyLiteral { + /// Get the data type of this literal value + fn data_type(&self) -> String { + format!("{}", self.value.get_datatype()) + } + + fn value_i32(&self) -> PyResult { + if let ScalarValue::Int32(Some(n)) = &self.value { + Ok(*n) + } else { + Err(py_runtime_err("Cannot access value as i32")) + } + } + + fn value_i64(&self) -> PyResult { + if let ScalarValue::Int64(Some(n)) = &self.value { + Ok(*n) + } else { + Err(py_runtime_err("Cannot access value as i64")) + } + } + + fn value_str(&self) -> PyResult { + if let ScalarValue::Utf8(Some(str)) = &self.value { + Ok(str.clone()) + } else { + Err(py_runtime_err("Cannot access value as string")) + } + } + + fn __repr__(&self) -> PyResult { + Ok(format!("{}", self.value)) + } +} diff --git a/src/expr/projection.rs b/src/expr/projection.rs index 6d04e59a8..4c158f763 100644 --- a/src/expr/projection.rs +++ b/src/expr/projection.rs @@ -15,13 +15,11 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::DataFusionError; use datafusion_expr::logical_plan::Projection; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; use crate::common::df_schema::PyDFSchema; -use crate::errors::py_runtime_err; use crate::expr::logical_node::LogicalNode; use crate::expr::PyExpr; use crate::sql::logical::PyLogicalPlan; @@ -32,21 +30,21 @@ pub struct PyProjection { projection: Projection, } +impl PyProjection { + pub fn new(projection: Projection) -> Self { + Self { projection } + } +} + impl From for PyProjection { fn from(projection: Projection) -> PyProjection { PyProjection { projection } } } -impl TryFrom for Projection { - type Error = DataFusionError; - - fn try_from(py_proj: PyProjection) -> Result { - Projection::try_new_with_schema( - py_proj.projection.expr, - py_proj.projection.input.clone(), - py_proj.projection.schema, - ) +impl From for Projection { + fn from(proj: PyProjection) -> Self { + proj.projection } } @@ -66,8 +64,7 @@ impl Display for PyProjection { #[pymethods] impl PyProjection { /// Retrieves the expressions for this `Projection` - #[pyo3(name = "projections")] - fn py_projections(&self) -> PyResult> { + fn projections(&self) -> PyResult> { Ok(self .projection .expr @@ -76,25 +73,13 @@ impl PyProjection { .collect()) } - // Retrieves the input `LogicalPlan` to this `Projection` node - #[pyo3(name = "input")] - fn py_input(&self) -> PyResult { - // DataFusion make a loose guarantee that each Projection should have an input, however - // we check for that hear since we are performing explicit index retrieval - let inputs = LogicalNode::input(self); - if !inputs.is_empty() { - return Ok(inputs[0].clone()); - } - - Err(py_runtime_err(format!( - "Expected `input` field for Projection node: {}", - self - ))) + /// Retrieves the input `LogicalPlan` to this `Projection` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.projection.input).clone()) } - // Resulting Schema for this `Projection` node instance - #[pyo3(name = "schema")] - fn py_schema(&self) -> PyResult { + /// Resulting Schema for this `Projection` node instance + fn schema(&self) -> PyResult { Ok((*self.projection.schema).clone().into()) } diff --git a/src/expr/sort.rs b/src/expr/sort.rs new file mode 100644 index 000000000..1d0a7f6d3 --- /dev/null +++ b/src/expr/sort.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::DataFusionError; +use datafusion_expr::logical_plan::Sort; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::expr::PyExpr; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Sort", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PySort { + sort: Sort, +} + +impl From for PySort { + fn from(sort: Sort) -> PySort { + PySort { sort } + } +} + +impl TryFrom for Sort { + type Error = DataFusionError; + + fn try_from(agg: PySort) -> Result { + Ok(agg.sort) + } +} + +impl Display for PySort { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Sort + \nExpr(s): {:?} + \nInput: {:?} + \nSchema: {:?}", + &self.sort.expr, + self.sort.input, + self.sort.input.schema() + ) + } +} + +#[pymethods] +impl PySort { + /// Retrieves the sort expressions for this `Sort` + fn sort_exprs(&self) -> PyResult> { + Ok(self + .sort + .expr + .iter() + .map(|e| PyExpr::from(e.clone())) + .collect()) + } + + /// Retrieves the input `LogicalPlan` to this `Sort` node + fn input(&self) -> PyLogicalPlan { + PyLogicalPlan::from((*self.sort.input).clone()) + } + + /// Resulting Schema for this `Sort` node instance + fn schema(&self) -> PyDFSchema { + self.sort.input.schema().as_ref().clone().into() + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Sort({})", self)) + } +} + +impl LogicalNode for PySort { + fn input(&self) -> Vec { + vec![PyLogicalPlan::from((*self.sort.input).clone())] + } +} diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs index 00504b97f..2784523e7 100644 --- a/src/expr/table_scan.rs +++ b/src/expr/table_scan.rs @@ -19,6 +19,8 @@ use datafusion_expr::logical_plan::TableScan; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; +use crate::expr::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; use crate::{common::df_schema::PyDFSchema, expr::PyExpr}; #[pyclass(name = "TableScan", module = "datafusion.expr", subclass)] @@ -27,6 +29,12 @@ pub struct PyTableScan { table_scan: TableScan, } +impl PyTableScan { + pub fn new(table_scan: TableScan) -> Self { + Self { table_scan } + } +} + impl From for TableScan { fn from(tbl_scan: PyTableScan) -> TableScan { tbl_scan.table_scan @@ -117,3 +125,10 @@ impl PyTableScan { Ok(format!("TableScan({})", self)) } } + +impl LogicalNode for PyTableScan { + fn input(&self) -> Vec { + // table scans are leaf nodes and do not have inputs + vec![] + } +} diff --git a/src/sql/logical.rs b/src/sql/logical.rs index dcd7baa58..ee48f1e17 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -17,6 +17,14 @@ use std::sync::Arc; +use crate::errors::py_runtime_err; +use crate::expr::aggregate::PyAggregate; +use crate::expr::analyze::PyAnalyze; +use crate::expr::filter::PyFilter; +use crate::expr::limit::PyLimit; +use crate::expr::projection::PyProjection; +use crate::expr::sort::PySort; +use crate::expr::table_scan::PyTableScan; use datafusion_expr::LogicalPlan; use pyo3::prelude::*; @@ -33,12 +41,33 @@ impl PyLogicalPlan { plan: Arc::new(plan), } } + + pub fn plan(&self) -> Arc { + self.plan.clone() + } } #[pymethods] impl PyLogicalPlan { + /// Return the specific logical operator + fn to_variant(&self, py: Python) -> PyResult { + Python::with_gil(|_| match self.plan.as_ref() { + LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)), + LogicalPlan::Analyze(plan) => Ok(PyAnalyze::from(plan.clone()).into_py(py)), + LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)), + LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)), + LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), + LogicalPlan::Sort(plan) => Ok(PySort::from(plan.clone()).into_py(py)), + LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), + other => Err(py_runtime_err(format!( + "Cannot convert this plan to a LogicalNode: {:?}", + other + ))), + }) + } + /// Get the inputs to this plan - pub fn inputs(&self) -> Vec { + fn inputs(&self) -> Vec { let mut inputs = vec![]; for input in self.plan.inputs() { inputs.push(input.to_owned().into()); @@ -46,19 +75,23 @@ impl PyLogicalPlan { inputs } - pub fn display(&self) -> String { + fn __repr__(&self) -> PyResult { + Ok(format!("{:?}", self.plan)) + } + + fn display(&self) -> String { format!("{}", self.plan.display()) } - pub fn display_indent(&self) -> String { + fn display_indent(&self) -> String { format!("{}", self.plan.display_indent()) } - pub fn display_indent_schema(&self) -> String { + fn display_indent_schema(&self) -> String { format!("{}", self.plan.display_indent_schema()) } - pub fn display_graphviz(&self) -> String { + fn display_graphviz(&self) -> String { format!("{}", self.plan.display_indent_schema()) } }