From 832bac82ca7df1d7dfb3fa114309e79a59dc6791 Mon Sep 17 00:00:00 2001 From: Dejan Simic <10134699+simicd@users.noreply.github.com> Date: Sun, 26 Feb 2023 11:55:02 +0100 Subject: [PATCH 1/5] Implement from_arrow_table(), from_pydict() & from_pylist() --- datafusion/tests/test_context.py | 45 ++++++++++++++++++++++++++++ src/context.rs | 51 ++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index efa2eded5..56370a186 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -95,6 +95,51 @@ def test_create_dataframe_registers_unique_table_name(ctx): assert c in "0123456789abcdef" +def test_from_arrow_table(ctx): + # create a PyArrow table + data = {"a": [1, 2, 3], "b": [4, 5, 6]} + table = pa.Table.from_pydict(data) + + # convert to DataFrame + df = ctx.from_arrow_table(table) + tables = list(ctx.tables()) + + assert df + assert len(tables) == 1 + assert set(df.schema().names) == {"a", "b"} + assert df.collect()[0].num_rows == 3 + + +def test_from_pylist(ctx): + # create a dataframe from Python list + data = [ + {"a": 1, "b": 4}, + {"a": 2, "b": 5}, + {"a": 3, "b": 6}, + ] + + df = ctx.from_pylist(data) + tables = list(ctx.tables()) + + assert df + assert len(tables) == 1 + assert set(df.schema().names) == {"a", "b"} + assert df.collect()[0].num_rows == 3 + + +def test_from_pydict(ctx): + # create a dataframe from Python dictionary + data = {"a": [1, 2, 3], "b": [4, 5, 6]} + + df = ctx.from_pydict(data) + tables = list(ctx.tables()) + + assert df + assert len(tables) == 1 + assert set(df.schema().names) == {"a", "b"} + assert df.collect()[0].num_rows == 3 + + def test_register_table(ctx, database): default = ctx.catalog() public = default.database("public") diff --git a/src/context.rs b/src/context.rs index 1acf5f289..1ddb916b0 100644 --- a/src/context.rs +++ b/src/context.rs @@ -50,6 +50,7 @@ use datafusion::prelude::{ AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions, }; use datafusion_common::ScalarValue; +use pyo3::types::PyTuple; use tokio::runtime::Runtime; use tokio::task::JoinHandle; @@ -302,6 +303,56 @@ impl PySessionContext { PyDataFrame::new(DataFrame::new(self.ctx.state(), plan.plan.as_ref().clone())) } + fn from_pylist(&mut self, data: PyObject, py: Python) -> PyResult { + Python::with_gil(|py| { + // Instantiate pyarrow Table object & convert to batches + let table_class = py.import("pyarrow")?.getattr("Table")?; + let args = PyTuple::new(py, &[data]); + let table = table_class + .call_method1("from_pylist", args)? + .call_method0("to_batches")?; + + // Cast PyObject to Recordbatch type + // Because create_dataframe() expects a vector of vectors of record batches + // we need to wrap the record batches in an additional vector + let batches = table.extract::>>()?; + let list_of_batches = PyArrowType::try_from(vec![batches.0])?; + self.create_dataframe(list_of_batches, py) + }) + } + + fn from_pydict(&mut self, data: PyObject, py: Python) -> PyResult { + Python::with_gil(|py| { + // Instantiate pyarrow Table object & convert to batches + let table_class = py.import("pyarrow")?.getattr("Table")?; + let args = PyTuple::new(py, &[data]); + let table = table_class + .call_method1("from_pydict", args)? + .call_method0("to_batches")?; + + // Cast PyObject to Recordbatch type + // Because create_dataframe() expects a vector of vectors of record batches + // we need to wrap the record batches in an additional vector + let batches = table.extract::>>()?; + let list_of_batches = PyArrowType::try_from(vec![batches.0])?; + self.create_dataframe(list_of_batches, py) + }) + } + + fn from_arrow_table(&mut self, data: PyObject, py: Python) -> PyResult { + Python::with_gil(|py| { + // Instantiate pyarrow Table object & convert to batches + let table = data.call_method0(py, "to_batches")?; + + // Cast PyObject to Recordbatch type + // Because create_dataframe() expects a vector of vectors of record batches + // we need to wrap the record batches in an additional vector + let batches = table.extract::>>(py)?; + let list_of_batches = PyArrowType::try_from(vec![batches.0])?; + self.create_dataframe(list_of_batches, py) + }) + } + fn register_table(&mut self, name: &str, table: &PyTable) -> PyResult<()> { self.ctx .register_table(name, table.table()) From 44314fdad1b74f90470a1fe826290b3eee9506fa Mon Sep 17 00:00:00 2001 From: Dejan Simic <10134699+simicd@users.noreply.github.com> Date: Sun, 26 Feb 2023 12:07:21 +0100 Subject: [PATCH 2/5] Refactor functions --- src/context.rs | 38 +++++++++++++------------------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/src/context.rs b/src/context.rs index 1ddb916b0..43fa3d111 100644 --- a/src/context.rs +++ b/src/context.rs @@ -303,50 +303,38 @@ impl PySessionContext { PyDataFrame::new(DataFrame::new(self.ctx.state(), plan.plan.as_ref().clone())) } - fn from_pylist(&mut self, data: PyObject, py: Python) -> PyResult { + fn from_pylist(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { - // Instantiate pyarrow Table object & convert to batches + // Instantiate pyarrow Table object & convert to Arrow Table let table_class = py.import("pyarrow")?.getattr("Table")?; let args = PyTuple::new(py, &[data]); - let table = table_class - .call_method1("from_pylist", args)? - .call_method0("to_batches")?; + let table = table_class.call_method1("from_pylist", args)?.into(); - // Cast PyObject to Recordbatch type - // Because create_dataframe() expects a vector of vectors of record batches - // we need to wrap the record batches in an additional vector - let batches = table.extract::>>()?; - let list_of_batches = PyArrowType::try_from(vec![batches.0])?; - self.create_dataframe(list_of_batches, py) + let df = self.from_arrow_table(table, py)?; + Ok(df) }) } - fn from_pydict(&mut self, data: PyObject, py: Python) -> PyResult { + fn from_pydict(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { - // Instantiate pyarrow Table object & convert to batches + // Instantiate pyarrow Table object & convert to Arrow Table let table_class = py.import("pyarrow")?.getattr("Table")?; let args = PyTuple::new(py, &[data]); - let table = table_class - .call_method1("from_pydict", args)? - .call_method0("to_batches")?; + let table = table_class.call_method1("from_pydict", args)?.into(); - // Cast PyObject to Recordbatch type - // Because create_dataframe() expects a vector of vectors of record batches - // we need to wrap the record batches in an additional vector - let batches = table.extract::>>()?; - let list_of_batches = PyArrowType::try_from(vec![batches.0])?; - self.create_dataframe(list_of_batches, py) + let df = self.from_arrow_table(table, py)?; + Ok(df) }) } - fn from_arrow_table(&mut self, data: PyObject, py: Python) -> PyResult { + fn from_arrow_table(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { // Instantiate pyarrow Table object & convert to batches let table = data.call_method0(py, "to_batches")?; - // Cast PyObject to Recordbatch type + // Cast PyObject to RecordBatch type // Because create_dataframe() expects a vector of vectors of record batches - // we need to wrap the record batches in an additional vector + // here we need to wrap the vector of record batches in an additional vector let batches = table.extract::>>(py)?; let list_of_batches = PyArrowType::try_from(vec![batches.0])?; self.create_dataframe(list_of_batches, py) From dfc19a1b2369a63e9c6359e69fae25abb6d09cff Mon Sep 17 00:00:00 2001 From: Dejan Simic <10134699+simicd@users.noreply.github.com> Date: Sun, 26 Feb 2023 13:55:56 +0100 Subject: [PATCH 3/5] Implement from_pandas() & from_polars() --- datafusion/tests/test_context.py | 36 ++++++++++++++++++++++++++++++++ src/context.rs | 31 +++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 56370a186..0cdf38086 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -26,6 +26,7 @@ SessionContext, SessionConfig, RuntimeConfig, + DataFrame, ) import pytest @@ -106,6 +107,7 @@ def test_from_arrow_table(ctx): assert df assert len(tables) == 1 + assert type(df) == DataFrame assert set(df.schema().names) == {"a", "b"} assert df.collect()[0].num_rows == 3 @@ -123,6 +125,7 @@ def test_from_pylist(ctx): assert df assert len(tables) == 1 + assert type(df) == DataFrame assert set(df.schema().names) == {"a", "b"} assert df.collect()[0].num_rows == 3 @@ -136,6 +139,39 @@ def test_from_pydict(ctx): assert df assert len(tables) == 1 + assert type(df) == DataFrame + assert set(df.schema().names) == {"a", "b"} + assert df.collect()[0].num_rows == 3 + + +def test_from_pandas(ctx): + # create a dataframe from pandas dataframe + pd = pytest.importorskip("pandas") + data = {"a": [1, 2, 3], "b": [4, 5, 6]} + pandas_df = pd.DataFrame(data) + + df = ctx.from_pandas(pandas_df) + tables = list(ctx.tables()) + + assert df + assert len(tables) == 1 + assert type(df) == DataFrame + assert set(df.schema().names) == {"a", "b"} + assert df.collect()[0].num_rows == 3 + + +def test_from_polars(ctx): + # create a dataframe from Polars dataframe + pd = pytest.importorskip("polars") + data = {"a": [1, 2, 3], "b": [4, 5, 6]} + polars_df = pd.DataFrame(data) + + df = ctx.from_polars(polars_df) + tables = list(ctx.tables()) + + assert df + assert len(tables) == 1 + assert type(df) == DataFrame assert set(df.schema().names) == {"a", "b"} assert df.collect()[0].num_rows == 3 diff --git a/src/context.rs b/src/context.rs index 43fa3d111..be6c93b42 100644 --- a/src/context.rs +++ b/src/context.rs @@ -303,6 +303,7 @@ impl PySessionContext { PyDataFrame::new(DataFrame::new(self.ctx.state(), plan.plan.as_ref().clone())) } + /// Construct datafusion dataframe from Python list fn from_pylist(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { // Instantiate pyarrow Table object & convert to Arrow Table @@ -310,11 +311,13 @@ impl PySessionContext { let args = PyTuple::new(py, &[data]); let table = table_class.call_method1("from_pylist", args)?.into(); + // Convert Arrow Table to datafusion DataFrame let df = self.from_arrow_table(table, py)?; Ok(df) }) } + /// Construct datafusion dataframe from Python dictionary fn from_pydict(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { // Instantiate pyarrow Table object & convert to Arrow Table @@ -322,11 +325,13 @@ impl PySessionContext { let args = PyTuple::new(py, &[data]); let table = table_class.call_method1("from_pydict", args)?.into(); + // Convert Arrow Table to datafusion DataFrame let df = self.from_arrow_table(table, py)?; Ok(df) }) } + /// Construct datafusion dataframe from Arrow Table fn from_arrow_table(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { // Instantiate pyarrow Table object & convert to batches @@ -341,6 +346,32 @@ impl PySessionContext { }) } + /// Construct datafusion dataframe from pandas + fn from_pandas(&mut self, data: PyObject, _py: Python) -> PyResult { + Python::with_gil(|py| { + // Instantiate pyarrow Table object & convert to Arrow Table + let table_class = py.import("pyarrow")?.getattr("Table")?; + let args = PyTuple::new(py, &[data]); + let table = table_class.call_method1("from_pandas", args)?.into(); + + // Convert Arrow Table to datafusion DataFrame + let df = self.from_arrow_table(table, py)?; + Ok(df) + }) + } + + /// Construct datafusion dataframe from polars + fn from_polars(&mut self, data: PyObject, _py: Python) -> PyResult { + Python::with_gil(|py| { + // Convert Polars dataframe to Arrow Table + let table = data.call_method0(py, "to_arrow")?.into(); + + // Convert Arrow Table to datafusion DataFrame + let df = self.from_arrow_table(table, py)?; + Ok(df) + }) + } + fn register_table(&mut self, name: &str, table: &PyTable) -> PyResult<()> { self.ctx .register_table(name, table.table()) From 686c8c9aaeca53db8343aef134e5785707004b66 Mon Sep 17 00:00:00 2001 From: Dejan Simic <10134699+simicd@users.noreply.github.com> Date: Sun, 26 Feb 2023 14:11:52 +0100 Subject: [PATCH 4/5] Document new import functions --- examples/export.py | 9 ++----- examples/import.py | 58 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 7 deletions(-) create mode 100644 examples/import.py diff --git a/examples/export.py b/examples/export.py index b7fe2b603..d179bf39d 100644 --- a/examples/export.py +++ b/examples/export.py @@ -16,18 +16,13 @@ # under the License. import datafusion -import pyarrow # create a context ctx = datafusion.SessionContext() -# create a RecordBatch and a new datafusion DataFrame from it -batch = pyarrow.RecordBatch.from_arrays( - [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])], - names=["a", "b"], -) -df = ctx.create_dataframe([[batch]]) +# create a new datafusion DataFrame +df = ctx.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}) # Dataframe: # +---+---+ # | a | b | diff --git a/examples/import.py b/examples/import.py new file mode 100644 index 000000000..a249a1c4e --- /dev/null +++ b/examples/import.py @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datafusion +import pyarrow as pa +import pandas as pd +import polars as pl + + +# Create a context +ctx = datafusion.SessionContext() + +# Create a datafusion DataFrame from a Python dictionary +# The dictionary keys represent column names and the dictionary values +# represent column values +df = ctx.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}) +assert type(df) == datafusion.DataFrame +# Dataframe: +# +---+---+ +# | a | b | +# +---+---+ +# | 1 | 4 | +# | 2 | 5 | +# | 3 | 6 | +# +---+---+ + +# Create a datafusion DataFrame from a Python list of rows +df = ctx.from_pylist([{"a": 1, "b": 4}, {"a": 2, "b": 5}, {"a": 3, "b": 6}]) +assert type(df) == datafusion.DataFrame + +# Convert pandas DataFrame to datafusion DataFrame +pandas_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) +df = ctx.from_pandas(pandas_df) +assert type(df) == datafusion.DataFrame + +# Convert polars DataFrame to datafusion DataFrame +polars_df = pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) +df = ctx.from_polars(polars_df) +assert type(df) == datafusion.DataFrame + +# Convert Arrow Table to datafusion DataFrame +arrow_table = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}) +df = ctx.from_arrow_table(arrow_table) +assert type(df) == datafusion.DataFrame From 7fde42d11c783581efb0b39c332b593f2124251d Mon Sep 17 00:00:00 2001 From: Dejan Simic <10134699+simicd@users.noreply.github.com> Date: Sun, 26 Feb 2023 14:45:09 +0100 Subject: [PATCH 5/5] Fix clippy errors --- src/context.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/context.rs b/src/context.rs index be6c93b42..12f01d156 100644 --- a/src/context.rs +++ b/src/context.rs @@ -304,6 +304,7 @@ impl PySessionContext { } /// Construct datafusion dataframe from Python list + #[allow(clippy::wrong_self_convention)] fn from_pylist(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { // Instantiate pyarrow Table object & convert to Arrow Table @@ -318,6 +319,7 @@ impl PySessionContext { } /// Construct datafusion dataframe from Python dictionary + #[allow(clippy::wrong_self_convention)] fn from_pydict(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { // Instantiate pyarrow Table object & convert to Arrow Table @@ -332,6 +334,7 @@ impl PySessionContext { } /// Construct datafusion dataframe from Arrow Table + #[allow(clippy::wrong_self_convention)] fn from_arrow_table(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { // Instantiate pyarrow Table object & convert to batches @@ -347,6 +350,7 @@ impl PySessionContext { } /// Construct datafusion dataframe from pandas + #[allow(clippy::wrong_self_convention)] fn from_pandas(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { // Instantiate pyarrow Table object & convert to Arrow Table @@ -361,10 +365,11 @@ impl PySessionContext { } /// Construct datafusion dataframe from polars + #[allow(clippy::wrong_self_convention)] fn from_polars(&mut self, data: PyObject, _py: Python) -> PyResult { Python::with_gil(|py| { // Convert Polars dataframe to Arrow Table - let table = data.call_method0(py, "to_arrow")?.into(); + let table = data.call_method0(py, "to_arrow")?; // Convert Arrow Table to datafusion DataFrame let df = self.from_arrow_table(table, py)?;