From ea63638e280932b96db364eab2428f008f6438dc Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 13 Feb 2024 09:26:57 +0100 Subject: [PATCH 1/2] Move parquet specific tests into dedicated file --- dask_expr/io/tests/test_io.py | 132 +------------------------- dask_expr/io/tests/test_parquet.py | 146 +++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 131 deletions(-) create mode 100644 dask_expr/io/tests/test_parquet.py diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index f6b8e9802..4a111eaac 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -20,8 +20,7 @@ read_csv, read_parquet, ) -from dask_expr._expr import Expr, Lengths, Literal, Replace -from dask_expr._reductions import Len +from dask_expr._expr import Expr, Replace from dask_expr.io import FromArray, FromMap, ReadCSV, ReadParquet from dask_expr.tests._util import _backend_library @@ -121,85 +120,6 @@ def test_read_csv_keywords(tmpdir): assert_eq(df, expected) -@pytest.mark.skip() -def test_predicate_pushdown(tmpdir): - original = pd.DataFrame( - { - "a": [1, 2, 3, 4, 5] * 10, - "b": [0, 1, 2, 3, 4] * 10, - "c": range(50), - "d": [6, 7] * 25, - "e": [8, 9] * 25, - } - ) - fn = _make_file(tmpdir, format="parquet", df=original) - df = read_parquet(fn) - assert_eq(df, original) - x = df[df.a == 5][df.c > 20]["b"] - y = optimize(x, fuse=False) - assert isinstance(y.expr, ReadParquet) - assert ("a", "==", 5) in y.expr.operand("filters")[0] - assert ("c", ">", 20) in y.expr.operand("filters")[0] - assert list(y.columns) == ["b"] - - # Check computed result - y_result = y.compute() - assert y_result.name == "b" - assert len(y_result) == 6 - assert (y_result == 4).all() - - -@pytest.mark.skip() -def test_predicate_pushdown_compound(tmpdir): - pdf = pd.DataFrame( - { - "a": [1, 2, 3, 4, 5] * 10, - "b": [0, 1, 2, 3, 4] * 10, - "c": range(50), - "d": [6, 7] * 25, - "e": [8, 9] * 25, - } - ) - fn = _make_file(tmpdir, format="parquet", df=pdf) - df = read_parquet(fn) - - # Test AND - x = df[(df.a == 5) & (df.c > 20)]["b"] - y = optimize(x, fuse=False) - assert isinstance(y.expr, ReadParquet) - assert {("c", ">", 20), ("a", "==", 5)} == set(y.filters[0]) - assert_eq( - y, - pdf[(pdf.a == 5) & (pdf.c > 20)]["b"], - check_index=False, - ) - - # Test OR - x = df[(df.a == 5) | (df.c > 20)] - x = x[x.b != 0]["b"] - y = optimize(x, fuse=False) - assert isinstance(y.expr, ReadParquet) - filters = [set(y.filters[0]), set(y.filters[1])] - assert {("c", ">", 20), ("b", "!=", 0)} in filters - assert {("a", "==", 5), ("b", "!=", 0)} in filters - expect = pdf[(pdf.a == 5) | (pdf.c > 20)] - expect = expect[expect.b != 0]["b"] - assert_eq( - y, - expect, - check_index=False, - ) - - # Test OR and AND - x = df[((df.a == 5) | (df.c > 20)) & (df.b != 0)]["b"] - z = optimize(x, fuse=False) - assert isinstance(z.expr, ReadParquet) - filters = [set(z.filters[0]), set(z.filters[1])] - assert {("c", ">", 20), ("b", "!=", 0)} in filters - assert {("a", "==", 5), ("b", "!=", 0)} in filters - assert_eq(y, z) - - def test_io_fusion_blockwise(tmpdir): pdf = pd.DataFrame({c: range(10) for c in "abcdefghijklmn"}) dd.from_pandas(pdf, 3).to_parquet(tmpdir) @@ -288,27 +208,6 @@ def test_parquet_complex_filters(tmpdir): assert_eq(got.optimize(), expect) -def test_parquet_len(tmpdir): - df = read_parquet(_make_file(tmpdir)) - pdf = df.compute() - - assert len(df[df.a > 5]) == len(pdf[pdf.a > 5]) - - s = (df["b"] + 1).astype("Int32") - assert len(s) == len(pdf) - - assert isinstance(Len(s.expr).optimize(), Literal) - assert isinstance(Lengths(s.expr).optimize(), Literal) - - -def test_parquet_len_filter(tmpdir): - df = read_parquet(_make_file(tmpdir)) - expr = Len(df[df.c > 0].expr) - result = expr.simplify() - for rp in result.find_operations(ReadParquet): - assert rp.operand("columns") == ["c"] or rp.operand("columns") == [] - - @pytest.mark.parametrize("optimize", [True, False]) def test_from_dask_dataframe(optimize): ddf = dd.from_dict({"a": range(100)}, npartitions=10) @@ -335,35 +234,6 @@ def test_to_dask_array(optimize): array_assert_eq(darr, pdf.values) -@pytest.mark.parametrize("write_metadata_file", [True, False]) -def test_to_parquet(tmpdir, write_metadata_file): - pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) - df = from_pandas(pdf, npartitions=2) - - # Check basic parquet round trip - df.to_parquet(tmpdir, write_metadata_file=write_metadata_file) - df2 = read_parquet(tmpdir, calculate_divisions=True) - assert_eq(df, df2) - - # Check overwrite behavior - df["new"] = df["x"] + 1 - df.to_parquet(tmpdir, overwrite=True, write_metadata_file=write_metadata_file) - df2 = read_parquet(tmpdir, calculate_divisions=True) - assert_eq(df, df2) - - # Check that we cannot overwrite a path we are - # reading from in the same graph - with pytest.raises(ValueError, match="Cannot overwrite"): - df2.to_parquet(tmpdir, overwrite=True) - - -def test_to_parquet_engine(tmpdir): - pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) - df = from_pandas(pdf, npartitions=2) - with pytest.raises(NotImplementedError, match="not supported"): - df.to_parquet(tmpdir + "engine.parquet", engine="fastparquet") - - @pytest.mark.parametrize( "fmt,read_func,read_cls", [("parquet", read_parquet, ReadParquet), ("csv", read_csv, ReadCSV)], diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py new file mode 100644 index 000000000..2f6d6660d --- /dev/null +++ b/dask_expr/io/tests/test_parquet.py @@ -0,0 +1,146 @@ +import os + +import pandas as pd +import pytest +from dask.dataframe.utils import assert_eq + +from dask_expr import from_pandas, optimize, read_parquet +from dask_expr._expr import Lengths, Literal +from dask_expr._reductions import Len +from dask_expr.io import ReadParquet + + +def _make_file(dir, df=None): + fn = os.path.join(str(dir), f"myfile.{format}") + if df is None: + df = pd.DataFrame({c: range(10) for c in "abcde"}) + df.to_parquet(fn) + + +def test_parquet_len(tmpdir): + df = read_parquet(_make_file(tmpdir)) + pdf = df.compute() + + assert len(df[df.a > 5]) == len(pdf[pdf.a > 5]) + + s = (df["b"] + 1).astype("Int32") + assert len(s) == len(pdf) + + assert isinstance(Len(s.expr).optimize(), Literal) + assert isinstance(Lengths(s.expr).optimize(), Literal) + + +def test_parquet_len_filter(tmpdir): + df = read_parquet(_make_file(tmpdir)) + expr = Len(df[df.c > 0].expr) + result = expr.simplify() + for rp in result.find_operations(ReadParquet): + assert rp.operand("columns") == ["c"] or rp.operand("columns") == [] + + +@pytest.mark.parametrize("write_metadata_file", [True, False]) +def test_to_parquet(tmpdir, write_metadata_file): + pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) + df = from_pandas(pdf, npartitions=2) + + # Check basic parquet round trip + df.to_parquet(tmpdir, write_metadata_file=write_metadata_file) + df2 = read_parquet(tmpdir, calculate_divisions=True) + assert_eq(df, df2) + + # Check overwrite behavior + df["new"] = df["x"] + 1 + df.to_parquet(tmpdir, overwrite=True, write_metadata_file=write_metadata_file) + df2 = read_parquet(tmpdir, calculate_divisions=True) + assert_eq(df, df2) + + # Check that we cannot overwrite a path we are + # reading from in the same graph + with pytest.raises(ValueError, match="Cannot overwrite"): + df2.to_parquet(tmpdir, overwrite=True) + + +def test_to_parquet_engine(tmpdir): + pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) + df = from_pandas(pdf, npartitions=2) + with pytest.raises(NotImplementedError, match="not supported"): + df.to_parquet(tmpdir + "engine.parquet", engine="fastparquet") + + +@pytest.mark.skip() +def test_predicate_pushdown(tmpdir): + original = pd.DataFrame( + { + "a": [1, 2, 3, 4, 5] * 10, + "b": [0, 1, 2, 3, 4] * 10, + "c": range(50), + "d": [6, 7] * 25, + "e": [8, 9] * 25, + } + ) + fn = _make_file(tmpdir, format="parquet", df=original) + df = read_parquet(fn) + assert_eq(df, original) + x = df[df.a == 5][df.c > 20]["b"] + y = optimize(x, fuse=False) + assert isinstance(y.expr, ReadParquet) + assert ("a", "==", 5) in y.expr.operand("filters")[0] + assert ("c", ">", 20) in y.expr.operand("filters")[0] + assert list(y.columns) == ["b"] + + # Check computed result + y_result = y.compute() + assert y_result.name == "b" + assert len(y_result) == 6 + assert (y_result == 4).all() + + +@pytest.mark.skip() +def test_predicate_pushdown_compound(tmpdir): + pdf = pd.DataFrame( + { + "a": [1, 2, 3, 4, 5] * 10, + "b": [0, 1, 2, 3, 4] * 10, + "c": range(50), + "d": [6, 7] * 25, + "e": [8, 9] * 25, + } + ) + fn = _make_file(tmpdir, format="parquet", df=pdf) + df = read_parquet(fn) + + # Test AND + x = df[(df.a == 5) & (df.c > 20)]["b"] + y = optimize(x, fuse=False) + assert isinstance(y.expr, ReadParquet) + assert {("c", ">", 20), ("a", "==", 5)} == set(y.filters[0]) + assert_eq( + y, + pdf[(pdf.a == 5) & (pdf.c > 20)]["b"], + check_index=False, + ) + + # Test OR + x = df[(df.a == 5) | (df.c > 20)] + x = x[x.b != 0]["b"] + y = optimize(x, fuse=False) + assert isinstance(y.expr, ReadParquet) + filters = [set(y.filters[0]), set(y.filters[1])] + assert {("c", ">", 20), ("b", "!=", 0)} in filters + assert {("a", "==", 5), ("b", "!=", 0)} in filters + expect = pdf[(pdf.a == 5) | (pdf.c > 20)] + expect = expect[expect.b != 0]["b"] + assert_eq( + y, + expect, + check_index=False, + ) + + # Test OR and AND + x = df[((df.a == 5) | (df.c > 20)) & (df.b != 0)]["b"] + z = optimize(x, fuse=False) + assert isinstance(z.expr, ReadParquet) + filters = [set(z.filters[0]), set(z.filters[1])] + assert {("c", ">", 20), ("b", "!=", 0)} in filters + assert {("a", "==", 5), ("b", "!=", 0)} in filters + assert_eq(y, z) From a40c079b0f372af5d21162847896ec7a50f5a806 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Tue, 13 Feb 2024 10:32:12 +0100 Subject: [PATCH 2/2] Update dask_expr/io/tests/test_parquet.py --- dask_expr/io/tests/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py index 2f6d6660d..40be539c1 100644 --- a/dask_expr/io/tests/test_parquet.py +++ b/dask_expr/io/tests/test_parquet.py @@ -11,7 +11,7 @@ def _make_file(dir, df=None): - fn = os.path.join(str(dir), f"myfile.{format}") + fn = os.path.join(str(dir), f"myfile.parquet") if df is None: df = pd.DataFrame({c: range(10) for c in "abcde"}) df.to_parquet(fn)