From 05b9dfd3fbdc1c5d0db28d702217b7ee8f3c985a Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 13 Feb 2024 09:26:57 +0100 Subject: [PATCH 01/12] Move parquet specific tests into dedicated file --- dask_expr/io/tests/test_io.py | 132 +------------------------- dask_expr/io/tests/test_parquet.py | 146 +++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 131 deletions(-) create mode 100644 dask_expr/io/tests/test_parquet.py diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index cca5d63c0..f28772594 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -21,8 +21,7 @@ read_csv, read_parquet, ) -from dask_expr._expr import Expr, Lengths, Literal, Replace -from dask_expr._reductions import Len +from dask_expr._expr import Expr, Replace from dask_expr.io import FromArray, FromMap, ReadCSV, ReadParquet, parquet from dask_expr.tests._util import _backend_library @@ -122,85 +121,6 @@ def test_read_csv_keywords(tmpdir): assert_eq(df, expected) -@pytest.mark.skip() -def test_predicate_pushdown(tmpdir): - original = pd.DataFrame( - { - "a": [1, 2, 3, 4, 5] * 10, - "b": [0, 1, 2, 3, 4] * 10, - "c": range(50), - "d": [6, 7] * 25, - "e": [8, 9] * 25, - } - ) - fn = _make_file(tmpdir, format="parquet", df=original) - df = read_parquet(fn) - assert_eq(df, original) - x = df[df.a == 5][df.c > 20]["b"] - y = optimize(x, fuse=False) - assert isinstance(y.expr, ReadParquet) - assert ("a", "==", 5) in y.expr.operand("filters")[0] - assert ("c", ">", 20) in y.expr.operand("filters")[0] - assert list(y.columns) == ["b"] - - # Check computed result - y_result = y.compute() - assert y_result.name == "b" - assert len(y_result) == 6 - assert (y_result == 4).all() - - -@pytest.mark.skip() -def test_predicate_pushdown_compound(tmpdir): - pdf = pd.DataFrame( - { - "a": [1, 2, 3, 4, 5] * 10, - "b": [0, 1, 2, 3, 4] * 10, - "c": range(50), - "d": [6, 7] * 25, - "e": [8, 9] * 25, - } - ) - fn = _make_file(tmpdir, format="parquet", df=pdf) - df = read_parquet(fn) - - # Test AND - x = df[(df.a == 5) & (df.c > 20)]["b"] - y = optimize(x, fuse=False) - assert isinstance(y.expr, ReadParquet) - assert {("c", ">", 20), ("a", "==", 5)} == set(y.filters[0]) - assert_eq( - y, - pdf[(pdf.a == 5) & (pdf.c > 20)]["b"], - check_index=False, - ) - - # Test OR - x = df[(df.a == 5) | (df.c > 20)] - x = x[x.b != 0]["b"] - y = optimize(x, fuse=False) - assert isinstance(y.expr, ReadParquet) - filters = [set(y.filters[0]), set(y.filters[1])] - assert {("c", ">", 20), ("b", "!=", 0)} in filters - assert {("a", "==", 5), ("b", "!=", 0)} in filters - expect = pdf[(pdf.a == 5) | (pdf.c > 20)] - expect = expect[expect.b != 0]["b"] - assert_eq( - y, - expect, - check_index=False, - ) - - # Test OR and AND - x = df[((df.a == 5) | (df.c > 20)) & (df.b != 0)]["b"] - z = optimize(x, fuse=False) - assert isinstance(z.expr, ReadParquet) - filters = [set(z.filters[0]), set(z.filters[1])] - assert {("c", ">", 20), ("b", "!=", 0)} in filters - assert {("a", "==", 5), ("b", "!=", 0)} in filters - assert_eq(y, z) - - def test_io_fusion_blockwise(tmpdir): pdf = pd.DataFrame({c: range(10) for c in "abcdefghijklmn"}) dd.from_pandas(pdf, 3).to_parquet(tmpdir) @@ -289,27 +209,6 @@ def test_parquet_complex_filters(tmpdir): assert_eq(got.optimize(), expect) -def test_parquet_len(tmpdir): - df = read_parquet(_make_file(tmpdir)) - pdf = df.compute() - - assert len(df[df.a > 5]) == len(pdf[pdf.a > 5]) - - s = (df["b"] + 1).astype("Int32") - assert len(s) == len(pdf) - - assert isinstance(Len(s.expr).optimize(), Literal) - assert isinstance(Lengths(s.expr).optimize(), Literal) - - -def test_parquet_len_filter(tmpdir): - df = read_parquet(_make_file(tmpdir)) - expr = Len(df[df.c > 0].expr) - result = expr.simplify() - for rp in result.find_operations(ReadParquet): - assert rp.operand("columns") == ["c"] or rp.operand("columns") == [] - - @pytest.mark.parametrize("optimize", [True, False]) def test_from_dask_dataframe(optimize): ddf = dd.from_dict({"a": range(100)}, npartitions=10) @@ -336,35 +235,6 @@ def test_to_dask_array(optimize): array_assert_eq(darr, pdf.values) -@pytest.mark.parametrize("write_metadata_file", [True, False]) -def test_to_parquet(tmpdir, write_metadata_file): - pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) - df = from_pandas(pdf, npartitions=2) - - # Check basic parquet round trip - df.to_parquet(tmpdir, write_metadata_file=write_metadata_file) - df2 = read_parquet(tmpdir, calculate_divisions=True) - assert_eq(df, df2) - - # Check overwrite behavior - df["new"] = df["x"] + 1 - df.to_parquet(tmpdir, overwrite=True, write_metadata_file=write_metadata_file) - df2 = read_parquet(tmpdir, calculate_divisions=True) - assert_eq(df, df2) - - # Check that we cannot overwrite a path we are - # reading from in the same graph - with pytest.raises(ValueError, match="Cannot overwrite"): - df2.to_parquet(tmpdir, overwrite=True) - - -def test_to_parquet_engine(tmpdir): - pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) - df = from_pandas(pdf, npartitions=2) - with pytest.raises(NotImplementedError, match="not supported"): - df.to_parquet(tmpdir + "engine.parquet", engine="fastparquet") - - @pytest.mark.parametrize( "fmt,read_func,read_cls", [("parquet", read_parquet, ReadParquet), ("csv", read_csv, ReadCSV)], diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py new file mode 100644 index 000000000..2f6d6660d --- /dev/null +++ b/dask_expr/io/tests/test_parquet.py @@ -0,0 +1,146 @@ +import os + +import pandas as pd +import pytest +from dask.dataframe.utils import assert_eq + +from dask_expr import from_pandas, optimize, read_parquet +from dask_expr._expr import Lengths, Literal +from dask_expr._reductions import Len +from dask_expr.io import ReadParquet + + +def _make_file(dir, df=None): + fn = os.path.join(str(dir), f"myfile.{format}") + if df is None: + df = pd.DataFrame({c: range(10) for c in "abcde"}) + df.to_parquet(fn) + + +def test_parquet_len(tmpdir): + df = read_parquet(_make_file(tmpdir)) + pdf = df.compute() + + assert len(df[df.a > 5]) == len(pdf[pdf.a > 5]) + + s = (df["b"] + 1).astype("Int32") + assert len(s) == len(pdf) + + assert isinstance(Len(s.expr).optimize(), Literal) + assert isinstance(Lengths(s.expr).optimize(), Literal) + + +def test_parquet_len_filter(tmpdir): + df = read_parquet(_make_file(tmpdir)) + expr = Len(df[df.c > 0].expr) + result = expr.simplify() + for rp in result.find_operations(ReadParquet): + assert rp.operand("columns") == ["c"] or rp.operand("columns") == [] + + +@pytest.mark.parametrize("write_metadata_file", [True, False]) +def test_to_parquet(tmpdir, write_metadata_file): + pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) + df = from_pandas(pdf, npartitions=2) + + # Check basic parquet round trip + df.to_parquet(tmpdir, write_metadata_file=write_metadata_file) + df2 = read_parquet(tmpdir, calculate_divisions=True) + assert_eq(df, df2) + + # Check overwrite behavior + df["new"] = df["x"] + 1 + df.to_parquet(tmpdir, overwrite=True, write_metadata_file=write_metadata_file) + df2 = read_parquet(tmpdir, calculate_divisions=True) + assert_eq(df, df2) + + # Check that we cannot overwrite a path we are + # reading from in the same graph + with pytest.raises(ValueError, match="Cannot overwrite"): + df2.to_parquet(tmpdir, overwrite=True) + + +def test_to_parquet_engine(tmpdir): + pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) + df = from_pandas(pdf, npartitions=2) + with pytest.raises(NotImplementedError, match="not supported"): + df.to_parquet(tmpdir + "engine.parquet", engine="fastparquet") + + +@pytest.mark.skip() +def test_predicate_pushdown(tmpdir): + original = pd.DataFrame( + { + "a": [1, 2, 3, 4, 5] * 10, + "b": [0, 1, 2, 3, 4] * 10, + "c": range(50), + "d": [6, 7] * 25, + "e": [8, 9] * 25, + } + ) + fn = _make_file(tmpdir, format="parquet", df=original) + df = read_parquet(fn) + assert_eq(df, original) + x = df[df.a == 5][df.c > 20]["b"] + y = optimize(x, fuse=False) + assert isinstance(y.expr, ReadParquet) + assert ("a", "==", 5) in y.expr.operand("filters")[0] + assert ("c", ">", 20) in y.expr.operand("filters")[0] + assert list(y.columns) == ["b"] + + # Check computed result + y_result = y.compute() + assert y_result.name == "b" + assert len(y_result) == 6 + assert (y_result == 4).all() + + +@pytest.mark.skip() +def test_predicate_pushdown_compound(tmpdir): + pdf = pd.DataFrame( + { + "a": [1, 2, 3, 4, 5] * 10, + "b": [0, 1, 2, 3, 4] * 10, + "c": range(50), + "d": [6, 7] * 25, + "e": [8, 9] * 25, + } + ) + fn = _make_file(tmpdir, format="parquet", df=pdf) + df = read_parquet(fn) + + # Test AND + x = df[(df.a == 5) & (df.c > 20)]["b"] + y = optimize(x, fuse=False) + assert isinstance(y.expr, ReadParquet) + assert {("c", ">", 20), ("a", "==", 5)} == set(y.filters[0]) + assert_eq( + y, + pdf[(pdf.a == 5) & (pdf.c > 20)]["b"], + check_index=False, + ) + + # Test OR + x = df[(df.a == 5) | (df.c > 20)] + x = x[x.b != 0]["b"] + y = optimize(x, fuse=False) + assert isinstance(y.expr, ReadParquet) + filters = [set(y.filters[0]), set(y.filters[1])] + assert {("c", ">", 20), ("b", "!=", 0)} in filters + assert {("a", "==", 5), ("b", "!=", 0)} in filters + expect = pdf[(pdf.a == 5) | (pdf.c > 20)] + expect = expect[expect.b != 0]["b"] + assert_eq( + y, + expect, + check_index=False, + ) + + # Test OR and AND + x = df[((df.a == 5) | (df.c > 20)) & (df.b != 0)]["b"] + z = optimize(x, fuse=False) + assert isinstance(z.expr, ReadParquet) + filters = [set(z.filters[0]), set(z.filters[1])] + assert {("c", ">", 20), ("b", "!=", 0)} in filters + assert {("a", "==", 5), ("b", "!=", 0)} in filters + assert_eq(y, z) From 49fc0ce7e5a8d444872baaaeaa84aeefc21a6ee7 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 16 Feb 2024 15:05:13 +0100 Subject: [PATCH 02/12] read_parquet POC --- dask_expr/_collection.py | 58 +++- dask_expr/io/parquet.py | 500 ++++++++++++++++++++++++----- dask_expr/io/tests/test_parquet.py | 23 +- 3 files changed, 495 insertions(+), 86 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index c773b1db6..af7a376c3 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -4606,7 +4606,11 @@ def read_parquet( engine=None, **kwargs, ): - from dask_expr.io.parquet import ReadParquet, _set_parquet_engine + from dask_expr.io.parquet import ( + ReadParquetFSSpec, + ReadParquetPyarrowFS, + _set_parquet_engine, + ) if not isinstance(path, str): path = stringify_path(path) @@ -4618,9 +4622,59 @@ def read_parquet( col, op, val = filter if op == "in" and not isinstance(val, (set, list, tuple)): raise TypeError("Value of 'in' filter must be a list, set or tuple.") + from pyarrow import fs as pa_fs + + if ( + isinstance(filesystem, pa_fs.FileSystem) + or isinstance(filesystem, str) + and filesystem.lower() in ("arrow", "pyarrow") + ): + if calculate_divisions: + raise NotImplementedError( + "calculate_divisions is not supported when using the pyarrow filesystem." + ) + if metadata_task_size is not None: + raise NotImplementedError( + "metadata_task_size is not supported when using the pyarrow filesystem." + ) + if split_row_groups != "infer": + raise NotImplementedError( + "split_row_groups is not supported when using the pyarrow filesystem." + ) + if blocksize != "default": + raise NotImplementedError( + "blocksize is not supported when using the pyarrow filesystem." + ) + if aggregate_files is not None: + raise NotImplementedError( + "aggregate_files is not supported when using the pyarrow filesystem." + ) + if parquet_file_extension != (".parq", ".parquet", ".pq"): + raise NotImplementedError( + "parquet_file_extension is not supported when using the pyarrow filesystem." + ) + if engine is not None: + raise NotImplementedError( + "engine is not supported when using the pyarrow filesystem." + ) + + return new_collection( + ReadParquetPyarrowFS( + path, + columns=_convert_to_list(columns), + filters=filters, + categories=categories, + index=index, + storage_options=storage_options, + filesystem=filesystem, + ignore_metadata_file=ignore_metadata_file, + kwargs=kwargs, + _series=isinstance(columns, str), + ) + ) return new_collection( - ReadParquet( + ReadParquetFSSpec( path, columns=_convert_to_list(columns), filters=filters, diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 8cd9e8c3d..035f98363 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -4,12 +4,15 @@ import itertools import operator import warnings +from abc import abstractmethod from collections import defaultdict from functools import cached_property import dask +import pandas as pd import pyarrow as pa import pyarrow.dataset as pa_ds +import pyarrow.fs as pa_fs import pyarrow.parquet as pq import tlz as toolz from dask.base import normalize_token, tokenize @@ -50,6 +53,21 @@ from dask_expr._util import _convert_to_list, _tokenize_deterministic from dask_expr.io import BlockwiseIO, PartitionsFiltered +PYARROW_NULLABLE_DTYPE_MAPPING = { + pa.int8(): pd.Int8Dtype(), + pa.int16(): pd.Int16Dtype(), + pa.int32(): pd.Int32Dtype(), + pa.int64(): pd.Int64Dtype(), + pa.uint8(): pd.UInt8Dtype(), + pa.uint16(): pd.UInt16Dtype(), + pa.uint32(): pd.UInt32Dtype(), + pa.uint64(): pd.UInt64Dtype(), + pa.bool_(): pd.BooleanDtype(), + pa.string(): pd.StringDtype(), + pa.float32(): pd.Float32Dtype(), + pa.float64(): pd.Float64Dtype(), +} + NONE_LABEL = "__null_dask_index__" _CACHED_PLAN_SIZE = 10 @@ -406,9 +424,61 @@ def to_parquet( return out -class ReadParquet(PartitionsFiltered, BlockwiseIO): - """Read a parquet dataset""" +# class ReadParquetPyarrowFSNoMeta(ReadParquetPyarrowFS): + +# def _lower(self): +# return ReadParquetPyarrowFS( + + +# ) +def _determine_type_mapper( + *, user_types_mapper=None, dtype_backend=None, convert_string=True +): + type_mappers = [] + + def pyarrow_type_mapper(pyarrow_dtype): + # Special case pyarrow strings to use more feature complete dtype + # See https://github.com/pandas-dev/pandas/issues/50074 + if pyarrow_dtype == pa.string(): + return pd.StringDtype("pyarrow") + else: + return pd.ArrowDtype(pyarrow_dtype) + + # always use the user-defined mapper first, if available + if user_types_mapper is not None: + type_mappers.append(user_types_mapper) + # next in priority is converting strings + if convert_string: + type_mappers.append({pa.string(): pd.StringDtype("pyarrow")}.get) + type_mappers.append({pa.date32(): pd.ArrowDtype(pa.date32())}.get) + type_mappers.append({pa.date64(): pd.ArrowDtype(pa.date64())}.get) + + def _convert_decimal_type(type): + if pa.types.is_decimal(type): + return pd.ArrowDtype(type) + return None + + type_mappers.append(_convert_decimal_type) + + # and then nullable types + if dtype_backend == "numpy_nullable": + type_mappers.append(PYARROW_NULLABLE_DTYPE_MAPPING.get) + elif dtype_backend == "pyarrow": + type_mappers.append(pyarrow_type_mapper) + + def default_types_mapper(pyarrow_dtype): + """Try all type mappers in order, starting from the user type mapper.""" + for type_converter in type_mappers: + converted_type = type_converter(pyarrow_dtype) + if converted_type is not None: + return converted_type + + if len(type_mappers) > 0: + return default_types_mapper + + +class ReadParquetPyarrowFS(PartitionsFiltered, BlockwiseIO): _parameters = [ "path", "columns", @@ -416,15 +486,8 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): "categories", "index", "storage_options", - "calculate_divisions", - "ignore_metadata_file", - "metadata_task_size", - "split_row_groups", - "blocksize", - "aggregate_files", - "parquet_file_extension", "filesystem", - "engine", + "ignore_metadata_file", "kwargs", "_partitions", "_series", @@ -436,15 +499,8 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): "categories": None, "index": None, "storage_options": None, - "calculate_divisions": False, - "ignore_metadata_file": False, - "metadata_task_size": None, - "split_row_groups": "infer", - "blocksize": "default", - "aggregate_files": None, - "parquet_file_extension": (".parq", ".parquet", ".pq"), - "filesystem": "fsspec", - "engine": "pyarrow", + "filesystem": None, + "ignore_metadata_file": True, "kwargs": None, "_partitions": None, "_series": False, @@ -454,17 +510,170 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): _absorb_projections = True def _tree_repr_argument_construction(self, i, op, header): - if self._parameters[i] == "_dataset_info_cache": - # Don't print this, very ugly + try: + param = self._parameters[i] + default = self._defaults[param] + except (IndexError, KeyError): + param = self._parameters[i] if i < len(self._parameters) else "" + default = "--no-default--" + if param in ("_dataset_info_cache",): return header - return super()._tree_repr_argument_construction(i, op, header) + if repr(op) != repr(default): + if param: + header += f" {param}={repr(op)}" + else: + header += repr(op) + return header + + @cached_property + def fs(self): + fs_input = self.operand("filesystem") + if isinstance(fs_input, pa.fs.FileSystem): + return fs_input + else: + # TODO: Instantiate from storage_options + raise NotImplementedError("Must provide instantiated pyarrow FileSystem") + + @cached_property + def _dataset_info(self): + # TODO: Consider breaking up this monolithic thing + if rv := self.operand("_dataset_info_cache"): + return rv + dataset_info = {} + + path_normalized = _normalize_and_strip_protocol(self.path, self.fs) + # At this point we will post a couple of HEAD request to the store which gives us some metadata about the files but no the parquet metadata + # The information included here (see pyarrow FileInfo) are size, type, + # path and modified since timestamps + # This isn't free but realtively cheap (200-300ms or less for ~1k files) + finfo = self.fs.get_file_info(path_normalized) + if finfo.type == pa.fs.FileType.Directory: + dataset_selector = pa_fs.FileSelector(path_normalized) + all_files = self.fs.get_file_info(dataset_selector) + else: + all_files = [finfo] + # TODO: At this point we could verify if we're dealing with a very + # inhomogeneous datasets already without reading any further data + + # TODO: Verify that tokenize works as expected and does not fall back to + # representation since that does not include timestamps + metadata_file = False + checksum = None + dataset = None + if not self.ignore_metadata_file and False: + all_files = sorted(all_files, key=lambda x: x.endswith("_metadata")) + if all_files[-1].base_name.endswith("_metadata"): + metadata_file = all_files.pop() + checksum = tokenize(metadata_file) + # TODO: dataset kwargs? + dataset = pa_ds.parquet_dataset( + metadata_file.path, + filesystem=self.fs, + ) + dataset_info["fragments"] = dataset.get_fragments() + if checksum is None: + checksum = tokenize(all_files) + dataset_info["checksum"] = checksum + if dataset is None: + import pyarrow.parquet as pq + + dataset = pq.ParquetDataset( + # TODO: This is a different API than above because the + # pa_ds.parquet_dataset only accepts the metadata file. Are + # these APIs here the correct choice? + # FIXME: parquet_dataset should accept FileInfo objects such + # that we can save ourselves a couple of head requests + [fi.path for fi in all_files], + filesystem=self.fs, + ) + dataset_info["fragments"] = dataset.fragments + # Note: We only use the dataset once to generate fragments. The + # fragments themselves can do everything we care about like splitting by + # rowgroups, filtering and reading + # dataset_info["dataset"] = dataset + dataset_info["schema"] = dataset.schema + dataset_info["base_meta"] = dataset.schema.empty_table().to_pandas() + self.operands[ + type(self)._parameters.index("_dataset_info_cache") + ] = dataset_info + return dataset_info + + def _divisions(self): + return tuple([None] * (len(self.fragments) + 1)) @property - def engine(self): - _engine = self.operand("engine") - if isinstance(_engine, str): - return get_engine(_engine) - return _engine + def _meta(self): + meta = self._dataset_info["base_meta"] + columns = _convert_to_list(self.operand("columns")) + if self._series: + assert len(columns) > 0 + return meta[columns[0]] + elif columns is not None: + return meta[columns] + return meta + + @property + def fragments(self): + return self._dataset_info["fragments"] + + @staticmethod + def _fragment_to_pandas(fragment, columns): + from dask.utils import parse_bytes + + # TODO: There should be a way for users to define the type mapper + table = fragment.to_table( + # schema=None, + columns=columns, + # filter=None, + # batch_size=131072, + # batch_readahead=16, + # fragment_readahead=4, + fragment_scan_options=pa.dataset.ParquetFragmentScanOptions( + # use_buffered_stream=False, + # buffer_size=8192, + pre_buffer=True, + cache_options=pa.CacheOptions( + # hole_size_limit=parse_bytes("8 KiB"), + # range_size_limit=parse_bytes("32.00 MiB"), + hole_size_limit=parse_bytes("4 MiB"), + range_size_limit=parse_bytes("32.00 MiB"), + # I've seen this actually slowing us down, e.g. on TPCHQ14 + lazy=False, + prefetch_limit=500, + ), + # thrift_string_size_limit=None, + # thrift_container_size_limit=None, + # decryption_config=None, + # page_checksum_verification=False, + ), + # TODO: Reconsider this. The OMP_NUM_THREAD variable makes it harmful to enable this + use_threads=False, + ) + df = table.to_pandas( + types_mapper=_determine_type_mapper(), + # categories=None, + # strings_to_categorical=False, + # zero_copy_only=False, + # integer_object_nulls=False, + # date_as_object=True, + # timestamp_as_object=False, + use_threads=False, + # deduplicate_objects=True, + # ignore_metadata=False, + # safe=True, + # split_blocks=False, + self_destruct=True, + # maps_as_pydicts=None, + # coerce_temporal_nanoseconds=False, + ) + return df + + def _filtered_task(self, index: int): + return ( + ReadParquetPyarrowFS._fragment_to_pandas, + self.fragments[index], + self.columns, + ) @property def columns(self): @@ -474,6 +683,41 @@ def columns(self): else: return _convert_to_list(columns_operand) + @property + def _fusion_compression_factor(self): + if self.operand("columns") is None: + return 1 + nr_original_columns = len(self._dataset_info["schema"].names) - 1 + return max( + len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 + ) + + +class ReadParquet(PartitionsFiltered, BlockwiseIO): + _parameters = [ + "path", + "columns", + "filters", + "categories", + "index", + "storage_options", + "filesystem", + "kwargs", + "_dataset_info_cache", + ] + _defaults = { + "columns": None, + "filters": None, + "categories": None, + "index": None, + "storage_options": None, + "filesystem": None, + "kwargs": None, + "_dataset_info_cache": None, + } + _pq_length_stats = None + _absorb_projections = True + def _simplify_up(self, parent, dependents): if isinstance(parent, Index): # Column projection @@ -496,6 +740,14 @@ def _simplify_up(self, parent, dependents): if _lengths: return Literal(sum(_lengths)) + @property + def columns(self): + columns_operand = self.operand("columns") + if columns_operand is None: + return list(self._meta.columns) + else: + return _convert_to_list(columns_operand) + @cached_property def _name(self): return ( @@ -508,6 +760,126 @@ def _name(self): def checksum(self): return self._dataset_info["checksum"] + def _tree_repr_argument_construction(self, i, op, header): + if self._parameters[i] == "_dataset_info_cache": + # Don't print this, very ugly + return header + return super()._tree_repr_argument_construction(i, op, header) + + @property + def _meta(self): + meta = self._dataset_info["base_meta"] + columns = _convert_to_list(self.operand("columns")) + if self._series: + assert len(columns) > 0 + return meta[columns[0]] + elif columns is not None: + return meta[columns] + return meta + + def _divisions(self): + return self._plan["divisions"] + + @abstractmethod + @cached_property + def _plan(self): + raise NotImplementedError + + @property + @abstractmethod + def _dataset_info(self): + raise NotImplementedError + + def _get_lengths(self) -> tuple | None: + """Return known partition lengths using parquet statistics""" + if not self.filters: + self._update_length_statistics() + return tuple( + length + for i, length in enumerate(self._pq_length_stats) + if not self._filtered or i in self._partitions + ) + return None + + def _update_length_statistics(self): + """Ensure that partition-length statistics are up to date""" + + if not self._pq_length_stats: + if self._plan["statistics"]: + # Already have statistics from original API call + self._pq_length_stats = tuple( + stat["num-rows"] + for i, stat in enumerate(self._plan["statistics"]) + if not self._filtered or i in self._partitions + ) + else: + # Need to go back and collect statistics + self._pq_length_stats = tuple( + stat["num-rows"] for stat in _collect_pq_statistics(self) + ) + + @property + def _fusion_compression_factor(self): + if self.operand("columns") is None: + return 1 + nr_original_columns = len(self._dataset_info["schema"].names) - 1 + return max( + len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 + ) + + +class ReadParquetFSSpec(ReadParquet): + """Read a parquet dataset""" + + _parameters = [ + "path", + "columns", + "filters", + "categories", + "index", + "storage_options", + "calculate_divisions", + "ignore_metadata_file", + "metadata_task_size", + "split_row_groups", + "blocksize", + "aggregate_files", + "parquet_file_extension", + "filesystem", + "engine", + "kwargs", + "_partitions", + "_series", + "_dataset_info_cache", + ] + _defaults = { + "columns": None, + "filters": None, + "categories": None, + "index": None, + "storage_options": None, + "calculate_divisions": False, + "ignore_metadata_file": False, + "metadata_task_size": None, + "split_row_groups": "infer", + "blocksize": "default", + "aggregate_files": None, + "parquet_file_extension": (".parq", ".parquet", ".pq"), + "filesystem": "fsspec", + "engine": "pyarrow", + "kwargs": None, + "_partitions": None, + "_series": False, + "_dataset_info_cache": None, + } + + @property + def engine(self): + _engine = self.operand("engine") + if isinstance(_engine, str): + return get_engine(_engine) + return _engine + @property def _dataset_info(self): if rv := self.operand("_dataset_info_cache"): @@ -612,16 +984,11 @@ def _dataset_info(self): ] = dataset_info return dataset_info - @property - def _meta(self): - meta = self._dataset_info["base_meta"] - columns = _convert_to_list(self.operand("columns")) + def _filtered_task(self, index: int): + tsk = (self._io_func, self._plan["parts"][index]) if self._series: - assert len(columns) > 0 - return meta[columns[0]] - elif columns is not None: - return meta[columns] - return meta + return (operator.getitem, tsk, self.columns[0]) + return tsk @property def _io_func(self): @@ -678,51 +1045,6 @@ def _plan(self): } return _cached_plan[dataset_token] - def _divisions(self): - return self._plan["divisions"] - - def _filtered_task(self, index: int): - tsk = (self._io_func, self._plan["parts"][index]) - if self._series: - return (operator.getitem, tsk, self.columns[0]) - return tsk - - def _get_lengths(self) -> tuple | None: - """Return known partition lengths using parquet statistics""" - if not self.filters: - self._update_length_statistics() - return tuple( - length - for i, length in enumerate(self._pq_length_stats) - if not self._filtered or i in self._partitions - ) - - def _update_length_statistics(self): - """Ensure that partition-length statistics are up to date""" - - if not self._pq_length_stats: - if self._plan["statistics"]: - # Already have statistics from original API call - self._pq_length_stats = tuple( - stat["num-rows"] - for i, stat in enumerate(self._plan["statistics"]) - if not self._filtered or i in self._partitions - ) - else: - # Need to go back and collect statistics - self._pq_length_stats = tuple( - stat["num-rows"] for stat in _collect_pq_statistics(self) - ) - - @property - def _fusion_compression_factor(self): - if self.operand("columns") is None: - return 1 - nr_original_columns = len(self._dataset_info["schema"].names) - 1 - return max( - len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 - ) - # # Helper functions @@ -1040,3 +1362,19 @@ def _read_partition_stats(part, fs, columns=None): # Helper function used by _extract_statistics return [_read_partition_stats(part, fs, columns=columns) for part in parts] + + +def _normalize_and_strip_protocol(path, fs): + if isinstance(fs, pa.fs.LocalFileSystem): + protos = [] + elif fs.type_name == "s3": + protos = ["s3"] + else: + raise NotImplementedError(f"FileSystem of type {fs.typ_name} unknown") + for protocol in protos: + if path.startswith(protocol + "://"): + path = path[len(protocol) + 3 :] + elif path.startswith(protocol + "::"): + path = path[len(protocol) + 2 :] + path = path.rstrip("/") + return path diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py index 2f6d6660d..77070a033 100644 --- a/dask_expr/io/tests/test_parquet.py +++ b/dask_expr/io/tests/test_parquet.py @@ -1,6 +1,7 @@ import os import pandas as pd +import pyarrow as pa import pytest from dask.dataframe.utils import assert_eq @@ -11,10 +12,16 @@ def _make_file(dir, df=None): - fn = os.path.join(str(dir), f"myfile.{format}") + fn = os.path.join(str(dir), f"myfile.parquet") if df is None: df = pd.DataFrame({c: range(10) for c in "abcde"}) df.to_parquet(fn) + return fn + + +@pytest.fixture +def parquet_file(tmpdir): + return _make_file(tmpdir) def test_parquet_len(tmpdir): @@ -78,7 +85,7 @@ def test_predicate_pushdown(tmpdir): "e": [8, 9] * 25, } ) - fn = _make_file(tmpdir, format="parquet", df=original) + fn = _make_file(tmpdir, df=original) df = read_parquet(fn) assert_eq(df, original) x = df[df.a == 5][df.c > 20]["b"] @@ -106,7 +113,7 @@ def test_predicate_pushdown_compound(tmpdir): "e": [8, 9] * 25, } ) - fn = _make_file(tmpdir, format="parquet", df=pdf) + fn = _make_file(tmpdir, df=pdf) df = read_parquet(fn) # Test AND @@ -144,3 +151,13 @@ def test_predicate_pushdown_compound(tmpdir): assert {("c", ">", 20), ("b", "!=", 0)} in filters assert {("a", "==", 5), ("b", "!=", 0)} in filters assert_eq(y, z) + + +def test_pyarrow_filesystem(parquet_file): + from pyarrow import fs + + fs = fs.LocalFileSystem() + + df_pa = read_parquet(parquet_file, filesystem=fs) + df = read_parquet(parquet_file) + assert assert_eq(df, df_pa) From 6d8d4aa1a5b0936ea2a9b43e17fac535142c3220 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Tue, 20 Feb 2024 12:09:27 +0100 Subject: [PATCH 03/12] Enable predicate pushdown in read_parquet --- dask_expr/io/parquet.py | 393 +++++++++++++++-------------- dask_expr/io/tests/test_parquet.py | 128 ++++------ 2 files changed, 246 insertions(+), 275 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 035f98363..f46b1e54f 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -42,17 +42,31 @@ And, Blockwise, Expr, + Filter, Index, Lengths, Literal, Or, Projection, + are_co_aligned, determine_column_projection, + is_filter_pushdown_available, ) from dask_expr._reductions import Len from dask_expr._util import _convert_to_list, _tokenize_deterministic from dask_expr.io import BlockwiseIO, PartitionsFiltered + +@normalize_token.register(pa.fs.FileInfo) +def _tokenize_fileinfo(fileinfo): + return type(fileinfo).__name__, ( + fileinfo.path, + fileinfo.size, + fileinfo.mtime_ns, + fileinfo.size, + ) + + PYARROW_NULLABLE_DTYPE_MAPPING = { pa.int8(): pd.Int8Dtype(), pa.int16(): pd.Int16Dtype(), @@ -478,7 +492,159 @@ def default_types_mapper(pyarrow_dtype): return default_types_mapper -class ReadParquetPyarrowFS(PartitionsFiltered, BlockwiseIO): +class ReadParquet(PartitionsFiltered, BlockwiseIO): + _parameters = [ + "path", + "columns", + "filters", + "categories", + "index", + "storage_options", + "filesystem", + "kwargs", + "_dataset_info_cache", + ] + _defaults = { + "columns": None, + "filters": None, + "categories": None, + "index": None, + "storage_options": None, + "filesystem": None, + "kwargs": None, + "_dataset_info_cache": None, + } + _pq_length_stats = None + _absorb_projections = True + _filter_passthrough = True + + def _simplify_up(self, parent, dependents): + if isinstance(parent, Index): + # Column projection + columns = determine_column_projection(self, parent, dependents) + if set(columns) == set(self.columns): + return + columns = [col for col in self.columns if col in columns] + return self.substitute_parameters({"columns": columns, "_series": False}) + + if isinstance(parent, Projection): + return super()._simplify_up(parent, dependents) + + if ( + isinstance(parent, Filter) + and isinstance(parent.predicate, (LE, GE, LT, GT, EQ, NE, And, Or)) + and is_filter_pushdown_available(self, parent, dependents) + ): + # Predicate pushdown + filters = _DNF.extract_pq_filters(self, parent.predicate) + if filters._filters is not None: + return self.substitute_parameters( + { + "filters": filters.combine( + self.operand("filters") + ).to_list_tuple() + } + ) + + if isinstance(parent, Lengths): + _lengths = self._get_lengths() + if _lengths: + return Literal(_lengths) + + if isinstance(parent, Len): + _lengths = self._get_lengths() + if _lengths: + return Literal(sum(_lengths)) + + @property + def columns(self): + columns_operand = self.operand("columns") + if columns_operand is None: + return list(self._meta.columns) + else: + return _convert_to_list(columns_operand) + + @cached_property + def _name(self): + return ( + funcname(type(self)).lower() + + "-" + + _tokenize_deterministic(self.checksum, *self.operands[:-1]) + ) + + @property + def checksum(self): + return self._dataset_info["checksum"] + + def _tree_repr_argument_construction(self, i, op, header): + if self._parameters[i] == "_dataset_info_cache": + # Don't print this, very ugly + return header + return super()._tree_repr_argument_construction(i, op, header) + + @property + def _meta(self): + meta = self._dataset_info["base_meta"] + columns = _convert_to_list(self.operand("columns")) + if self._series: + assert len(columns) > 0 + return meta[columns[0]] + elif columns is not None: + return meta[columns] + return meta + + def _divisions(self): + return self._plan["divisions"] + + @abstractmethod + @cached_property + def _plan(self): + raise NotImplementedError + + @property + @abstractmethod + def _dataset_info(self): + raise NotImplementedError + + def _get_lengths(self) -> tuple | None: + """Return known partition lengths using parquet statistics""" + if not self.filters: + self._update_length_statistics() + return tuple( + length + for i, length in enumerate(self._pq_length_stats) + if not self._filtered or i in self._partitions + ) + return None + + def _update_length_statistics(self): + """Ensure that partition-length statistics are up to date""" + + if not self._pq_length_stats: + if self._plan["statistics"]: + # Already have statistics from original API call + self._pq_length_stats = tuple( + stat["num-rows"] + for i, stat in enumerate(self._plan["statistics"]) + if not self._filtered or i in self._partitions + ) + else: + # Need to go back and collect statistics + self._pq_length_stats = tuple( + stat["num-rows"] for stat in _collect_pq_statistics(self) + ) + + @property + def _fusion_compression_factor(self): + if self.operand("columns") is None: + return 1 + nr_original_columns = len(self._dataset_info["schema"].names) - 1 + return max( + len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 + ) + + +class ReadParquetPyarrowFS(ReadParquet): _parameters = [ "path", "columns", @@ -509,22 +675,6 @@ class ReadParquetPyarrowFS(PartitionsFiltered, BlockwiseIO): _pq_length_stats = None _absorb_projections = True - def _tree_repr_argument_construction(self, i, op, header): - try: - param = self._parameters[i] - default = self._defaults[param] - except (IndexError, KeyError): - param = self._parameters[i] if i < len(self._parameters) else "" - default = "--no-default--" - if param in ("_dataset_info_cache",): - return header - if repr(op) != repr(default): - if param: - header += f" {param}={repr(op)}" - else: - header += repr(op) - return header - @cached_property def fs(self): fs_input = self.operand("filesystem") @@ -542,26 +692,31 @@ def _dataset_info(self): dataset_info = {} path_normalized = _normalize_and_strip_protocol(self.path, self.fs) - # At this point we will post a couple of HEAD request to the store which gives us some metadata about the files but no the parquet metadata + # At this point we will post a couple of HEAD request to the store which + # gives us some metadata about the files but no the parquet metadata # The information included here (see pyarrow FileInfo) are size, type, # path and modified since timestamps # This isn't free but realtively cheap (200-300ms or less for ~1k files) finfo = self.fs.get_file_info(path_normalized) if finfo.type == pa.fs.FileType.Directory: - dataset_selector = pa_fs.FileSelector(path_normalized) - all_files = self.fs.get_file_info(dataset_selector) + dataset_selector = pa_fs.FileSelector(path_normalized, recursive=True) + all_files = [ + finfo + for finfo in self.fs.get_file_info(dataset_selector) + if finfo.type == pa.fs.FileType.File + ] else: all_files = [finfo] # TODO: At this point we could verify if we're dealing with a very # inhomogeneous datasets already without reading any further data - # TODO: Verify that tokenize works as expected and does not fall back to - # representation since that does not include timestamps metadata_file = False checksum = None dataset = None - if not self.ignore_metadata_file and False: - all_files = sorted(all_files, key=lambda x: x.endswith("_metadata")) + if not self.ignore_metadata_file: + all_files = sorted( + all_files, key=lambda x: x.base_name.endswith("_metadata") + ) if all_files[-1].base_name.endswith("_metadata"): metadata_file = all_files.pop() checksum = tokenize(metadata_file) @@ -570,6 +725,7 @@ def _dataset_info(self): metadata_file.path, filesystem=self.fs, ) + dataset_info["using_metadata_file"] = True dataset_info["fragments"] = dataset.get_fragments() if checksum is None: checksum = tokenize(all_files) @@ -581,16 +737,17 @@ def _dataset_info(self): # TODO: This is a different API than above because the # pa_ds.parquet_dataset only accepts the metadata file. Are # these APIs here the correct choice? - # FIXME: parquet_dataset should accept FileInfo objects such - # that we can save ourselves a couple of head requests + # Just pass all_files once https://github.com/apache/arrow/pull/40143 is available to reduce latency [fi.path for fi in all_files], filesystem=self.fs, + filters=self.filters, ) + dataset_info["using_metadata_file"] = False dataset_info["fragments"] = dataset.fragments # Note: We only use the dataset once to generate fragments. The # fragments themselves can do everything we care about like splitting by # rowgroups, filtering and reading - # dataset_info["dataset"] = dataset + dataset_info["dataset"] = dataset dataset_info["schema"] = dataset.schema dataset_info["base_meta"] = dataset.schema.empty_table().to_pandas() self.operands[ @@ -614,17 +771,25 @@ def _meta(self): @property def fragments(self): + if self.filters is not None: + if self._dataset_info["using_metadata_file"]: + ds = self._dataset_info["dataset"] + else: + ds = self._dataset_info["dataset"]._dataset + return list(ds.get_fragments(filter=pq.filters_to_expression(self.filters))) return self._dataset_info["fragments"] @staticmethod - def _fragment_to_pandas(fragment, columns): + def _fragment_to_pandas(fragment, columns, filters, schema): from dask.utils import parse_bytes + if isinstance(filters, list): + filters = pq.filters_to_expression(filters) # TODO: There should be a way for users to define the type mapper table = fragment.to_table( - # schema=None, + schema=schema, columns=columns, - # filter=None, + filter=filters, # batch_size=131072, # batch_readahead=16, # fragment_readahead=4, @@ -647,7 +812,7 @@ def _fragment_to_pandas(fragment, columns): # page_checksum_verification=False, ), # TODO: Reconsider this. The OMP_NUM_THREAD variable makes it harmful to enable this - use_threads=False, + use_threads=True, ) df = table.to_pandas( types_mapper=_determine_type_mapper(), @@ -673,158 +838,8 @@ def _filtered_task(self, index: int): ReadParquetPyarrowFS._fragment_to_pandas, self.fragments[index], self.columns, - ) - - @property - def columns(self): - columns_operand = self.operand("columns") - if columns_operand is None: - return list(self._meta.columns) - else: - return _convert_to_list(columns_operand) - - @property - def _fusion_compression_factor(self): - if self.operand("columns") is None: - return 1 - nr_original_columns = len(self._dataset_info["schema"].names) - 1 - return max( - len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 - ) - - -class ReadParquet(PartitionsFiltered, BlockwiseIO): - _parameters = [ - "path", - "columns", - "filters", - "categories", - "index", - "storage_options", - "filesystem", - "kwargs", - "_dataset_info_cache", - ] - _defaults = { - "columns": None, - "filters": None, - "categories": None, - "index": None, - "storage_options": None, - "filesystem": None, - "kwargs": None, - "_dataset_info_cache": None, - } - _pq_length_stats = None - _absorb_projections = True - - def _simplify_up(self, parent, dependents): - if isinstance(parent, Index): - # Column projection - columns = determine_column_projection(self, parent, dependents) - if set(columns) == set(self.columns): - return - columns = [col for col in self.columns if col in columns] - return self.substitute_parameters({"columns": columns, "_series": False}) - - if isinstance(parent, Projection): - return super()._simplify_up(parent, dependents) - - if isinstance(parent, Lengths): - _lengths = self._get_lengths() - if _lengths: - return Literal(_lengths) - - if isinstance(parent, Len): - _lengths = self._get_lengths() - if _lengths: - return Literal(sum(_lengths)) - - @property - def columns(self): - columns_operand = self.operand("columns") - if columns_operand is None: - return list(self._meta.columns) - else: - return _convert_to_list(columns_operand) - - @cached_property - def _name(self): - return ( - funcname(type(self)).lower() - + "-" - + _tokenize_deterministic(self.checksum, *self.operands) - ) - - @property - def checksum(self): - return self._dataset_info["checksum"] - - def _tree_repr_argument_construction(self, i, op, header): - if self._parameters[i] == "_dataset_info_cache": - # Don't print this, very ugly - return header - return super()._tree_repr_argument_construction(i, op, header) - - @property - def _meta(self): - meta = self._dataset_info["base_meta"] - columns = _convert_to_list(self.operand("columns")) - if self._series: - assert len(columns) > 0 - return meta[columns[0]] - elif columns is not None: - return meta[columns] - return meta - - def _divisions(self): - return self._plan["divisions"] - - @abstractmethod - @cached_property - def _plan(self): - raise NotImplementedError - - @property - @abstractmethod - def _dataset_info(self): - raise NotImplementedError - - def _get_lengths(self) -> tuple | None: - """Return known partition lengths using parquet statistics""" - if not self.filters: - self._update_length_statistics() - return tuple( - length - for i, length in enumerate(self._pq_length_stats) - if not self._filtered or i in self._partitions - ) - return None - - def _update_length_statistics(self): - """Ensure that partition-length statistics are up to date""" - - if not self._pq_length_stats: - if self._plan["statistics"]: - # Already have statistics from original API call - self._pq_length_stats = tuple( - stat["num-rows"] - for i, stat in enumerate(self._plan["statistics"]) - if not self._filtered or i in self._partitions - ) - else: - # Need to go back and collect statistics - self._pq_length_stats = tuple( - stat["num-rows"] for stat in _collect_pq_statistics(self) - ) - - @property - def _fusion_compression_factor(self): - if self.operand("columns") is None: - return 1 - nr_original_columns = len(self._dataset_info["schema"].names) - 1 - return max( - len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 + self.filters, + self._dataset_info["schema"], ) @@ -1210,19 +1225,15 @@ def combine(self, other: _DNF | _And | _Or | list | tuple | None) -> _DNF: def extract_pq_filters(cls, pq_expr: ReadParquet, predicate_expr: Expr) -> _DNF: _filters = None if isinstance(predicate_expr, (LE, GE, LT, GT, EQ, NE)): - if ( - isinstance(predicate_expr.left, ReadParquet) - and predicate_expr.left.path == pq_expr.path - and not isinstance(predicate_expr.right, Expr) + if are_co_aligned(pq_expr, predicate_expr.left) and not isinstance( + predicate_expr.right, Expr ): op = predicate_expr._operator_repr column = predicate_expr.left.columns[0] value = predicate_expr.right _filters = (column, op, value) - elif ( - isinstance(predicate_expr.right, ReadParquet) - and predicate_expr.right.path == pq_expr.path - and not isinstance(predicate_expr.left, Expr) + elif are_co_aligned(pq_expr, predicate_expr.right) and not isinstance( + predicate_expr.left, Expr ): # Simple dict to make sure field comes first in filter flip = {LE: GE, LT: GT, GE: LE, GT: LT} diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py index 77070a033..791a79dd4 100644 --- a/dask_expr/io/tests/test_parquet.py +++ b/dask_expr/io/tests/test_parquet.py @@ -1,18 +1,18 @@ import os import pandas as pd -import pyarrow as pa import pytest from dask.dataframe.utils import assert_eq +from pyarrow import fs -from dask_expr import from_pandas, optimize, read_parquet +from dask_expr import from_pandas, read_parquet from dask_expr._expr import Lengths, Literal from dask_expr._reductions import Len from dask_expr.io import ReadParquet def _make_file(dir, df=None): - fn = os.path.join(str(dir), f"myfile.parquet") + fn = os.path.join(str(dir), "myfile.parquet") if df is None: df = pd.DataFrame({c: range(10) for c in "abcde"}) df.to_parquet(fn) @@ -74,90 +74,50 @@ def test_to_parquet_engine(tmpdir): df.to_parquet(tmpdir + "engine.parquet", engine="fastparquet") -@pytest.mark.skip() -def test_predicate_pushdown(tmpdir): - original = pd.DataFrame( - { - "a": [1, 2, 3, 4, 5] * 10, - "b": [0, 1, 2, 3, 4] * 10, - "c": range(50), - "d": [6, 7] * 25, - "e": [8, 9] * 25, - } +def test_pyarrow_filesystem(parquet_file): + filesystem = fs.LocalFileSystem() + + df_pa = read_parquet(parquet_file, filesystem=filesystem) + df = read_parquet(parquet_file) + assert assert_eq(df, df_pa) + + +def test_pyarrow_filesystem_filters(parquet_file): + filesystem = fs.LocalFileSystem() + + df_pa = read_parquet(parquet_file, filesystem=filesystem) + df_pa = df_pa[df_pa.c == 1] + expected = read_parquet( + parquet_file, filesystem=filesystem, filters=[[("c", "==", 1)]] ) - fn = _make_file(tmpdir, df=original) - df = read_parquet(fn) - assert_eq(df, original) - x = df[df.a == 5][df.c > 20]["b"] - y = optimize(x, fuse=False) - assert isinstance(y.expr, ReadParquet) - assert ("a", "==", 5) in y.expr.operand("filters")[0] - assert ("c", ">", 20) in y.expr.operand("filters")[0] - assert list(y.columns) == ["b"] - - # Check computed result - y_result = y.compute() - assert y_result.name == "b" - assert len(y_result) == 6 - assert (y_result == 4).all() - - -@pytest.mark.skip() -def test_predicate_pushdown_compound(tmpdir): - pdf = pd.DataFrame( - { - "a": [1, 2, 3, 4, 5] * 10, - "b": [0, 1, 2, 3, 4] * 10, - "c": range(50), - "d": [6, 7] * 25, - "e": [8, 9] * 25, - } + assert df_pa.optimize()._name == expected.optimize()._name + assert len(df_pa.compute()) == 1 + + +def test_partition_pruning(tmpdir): + filesystem = fs.LocalFileSystem() + df = from_pandas( + pd.DataFrame( + { + "a": [1, 2, 3, 4, 5] * 10, + "b": range(50), + } + ), + npartitions=2, ) - fn = _make_file(tmpdir, df=pdf) - df = read_parquet(fn) - - # Test AND - x = df[(df.a == 5) & (df.c > 20)]["b"] - y = optimize(x, fuse=False) - assert isinstance(y.expr, ReadParquet) - assert {("c", ">", 20), ("a", "==", 5)} == set(y.filters[0]) - assert_eq( - y, - pdf[(pdf.a == 5) & (pdf.c > 20)]["b"], - check_index=False, + df.to_parquet(tmpdir, partition_on=["a"]) + ddf = read_parquet(tmpdir, filesystem=filesystem) + ddf_filtered = read_parquet( + tmpdir, filters=[[("a", "==", 1)]], filesystem=filesystem ) + assert ddf_filtered.npartitions == ddf.npartitions // 5 - # Test OR - x = df[(df.a == 5) | (df.c > 20)] - x = x[x.b != 0]["b"] - y = optimize(x, fuse=False) - assert isinstance(y.expr, ReadParquet) - filters = [set(y.filters[0]), set(y.filters[1])] - assert {("c", ">", 20), ("b", "!=", 0)} in filters - assert {("a", "==", 5), ("b", "!=", 0)} in filters - expect = pdf[(pdf.a == 5) | (pdf.c > 20)] - expect = expect[expect.b != 0]["b"] + ddf_optimize = read_parquet(tmpdir, filesystem=filesystem) + ddf_optimize = ddf_optimize[ddf_optimize.a == 1].optimize() + assert ddf_optimize.npartitions == ddf.npartitions // 5 assert_eq( - y, - expect, - check_index=False, + ddf_filtered, + ddf_optimize, + # FIXME ? + check_names=False, ) - - # Test OR and AND - x = df[((df.a == 5) | (df.c > 20)) & (df.b != 0)]["b"] - z = optimize(x, fuse=False) - assert isinstance(z.expr, ReadParquet) - filters = [set(z.filters[0]), set(z.filters[1])] - assert {("c", ">", 20), ("b", "!=", 0)} in filters - assert {("a", "==", 5), ("b", "!=", 0)} in filters - assert_eq(y, z) - - -def test_pyarrow_filesystem(parquet_file): - from pyarrow import fs - - fs = fs.LocalFileSystem() - - df_pa = read_parquet(parquet_file, filesystem=fs) - df = read_parquet(parquet_file) - assert assert_eq(df, df_pa) From b9f82ec9ba25fa3cbd0e87306392fea436584557 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 21 Feb 2024 11:13:14 +0100 Subject: [PATCH 04/12] refine filter passthrough --- dask_expr/io/parquet.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index f46b1e54f..bfa846cd4 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -50,7 +50,6 @@ Projection, are_co_aligned, determine_column_projection, - is_filter_pushdown_available, ) from dask_expr._reductions import Len from dask_expr._util import _convert_to_list, _tokenize_deterministic @@ -518,6 +517,13 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): _absorb_projections = True _filter_passthrough = True + def _filter_passthrough_available(self, parent, dependents): + return ( + super()._filter_passthrough_available(parent, dependents) + and (isinstance(parent.predicate, (LE, GE, LT, GT, EQ, NE, And, Or))) + and _DNF.extract_pq_filters(self, parent.predicate)._filters is not None + ) + def _simplify_up(self, parent, dependents): if isinstance(parent, Index): # Column projection @@ -530,10 +536,8 @@ def _simplify_up(self, parent, dependents): if isinstance(parent, Projection): return super()._simplify_up(parent, dependents) - if ( - isinstance(parent, Filter) - and isinstance(parent.predicate, (LE, GE, LT, GT, EQ, NE, And, Or)) - and is_filter_pushdown_available(self, parent, dependents) + if isinstance(parent, Filter) and self._filter_passthrough_available( + parent, dependents ): # Predicate pushdown filters = _DNF.extract_pq_filters(self, parent.predicate) From e5a0333cd38bcd90cf4c8ecd498fe680119e9d73 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 22 Feb 2024 15:03:45 +0100 Subject: [PATCH 05/12] fix test_io_fusion_blockwise --- dask_expr/io/tests/test_io.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index f28772594..a76bf13a0 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -124,7 +124,8 @@ def test_read_csv_keywords(tmpdir): def test_io_fusion_blockwise(tmpdir): pdf = pd.DataFrame({c: range(10) for c in "abcdefghijklmn"}) dd.from_pandas(pdf, 3).to_parquet(tmpdir) - df = read_parquet(tmpdir)["a"].fillna(10).optimize() + read_parq = read_parquet(tmpdir) + df = read_parq["a"].fillna(10).optimize() assert df.npartitions == 2 assert len(df.__dask_graph__()) == 2 graph = ( @@ -133,7 +134,9 @@ def test_io_fusion_blockwise(tmpdir): .optimize(fuse=False) .__dask_graph__() ) - assert any("readparquet-fused" in key[0] for key in graph.keys()) + assert any( + f"{read_parq._expr._name.split('-')[0]}-fused" in key[0] for key in graph.keys() + ) def test_repartition_io_fusion_blockwise(tmpdir): From ffabe4fb6adcf42c4a11695e15fff08de1ccccbd Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 22 Feb 2024 16:00:18 +0100 Subject: [PATCH 06/12] cleanup --- dask_expr/io/parquet.py | 104 +++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 59 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index bfa846cd4..8ae677187 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -4,7 +4,6 @@ import itertools import operator import warnings -from abc import abstractmethod from collections import defaultdict from functools import cached_property @@ -437,13 +436,6 @@ def to_parquet( return out -# class ReadParquetPyarrowFSNoMeta(ReadParquetPyarrowFS): - -# def _lower(self): -# return ReadParquetPyarrowFS( - - -# ) def _determine_type_mapper( *, user_types_mapper=None, dtype_backend=None, convert_string=True ): @@ -515,7 +507,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): } _pq_length_stats = None _absorb_projections = True - _filter_passthrough = True + _filter_passthrough = False def _filter_passthrough_available(self, parent, dependents): return ( @@ -598,46 +590,8 @@ def _meta(self): return meta def _divisions(self): - return self._plan["divisions"] - - @abstractmethod - @cached_property - def _plan(self): raise NotImplementedError - @property - @abstractmethod - def _dataset_info(self): - raise NotImplementedError - - def _get_lengths(self) -> tuple | None: - """Return known partition lengths using parquet statistics""" - if not self.filters: - self._update_length_statistics() - return tuple( - length - for i, length in enumerate(self._pq_length_stats) - if not self._filtered or i in self._partitions - ) - return None - - def _update_length_statistics(self): - """Ensure that partition-length statistics are up to date""" - - if not self._pq_length_stats: - if self._plan["statistics"]: - # Already have statistics from original API call - self._pq_length_stats = tuple( - stat["num-rows"] - for i, stat in enumerate(self._plan["statistics"]) - if not self._filtered or i in self._partitions - ) - else: - # Need to go back and collect statistics - self._pq_length_stats = tuple( - stat["num-rows"] for stat in _collect_pq_statistics(self) - ) - @property def _fusion_compression_factor(self): if self.operand("columns") is None: @@ -678,6 +632,7 @@ class ReadParquetPyarrowFS(ReadParquet): } _pq_length_stats = None _absorb_projections = True + _filter_passthrough = True @cached_property def fs(self): @@ -685,19 +640,22 @@ def fs(self): if isinstance(fs_input, pa.fs.FileSystem): return fs_input else: - # TODO: Instantiate from storage_options - raise NotImplementedError("Must provide instantiated pyarrow FileSystem") + fs = pa_fs.FileSystem.from_uri(self.path)[0] + if storage_options := self.storage_options: + # Use inferred region as the default + region = {} if "region" in storage_options else {"region": fs.region} + fs = type(fs)(**region, **storage_options) + return fs @cached_property def _dataset_info(self): - # TODO: Consider breaking up this monolithic thing if rv := self.operand("_dataset_info_cache"): return rv dataset_info = {} path_normalized = _normalize_and_strip_protocol(self.path, self.fs) - # At this point we will post a couple of HEAD request to the store which - # gives us some metadata about the files but no the parquet metadata + # At this point we will post a couple of listbucket operations which + # includes the same data as a HEAD request. # The information included here (see pyarrow FileInfo) are size, type, # path and modified since timestamps # This isn't free but realtively cheap (200-300ms or less for ~1k files) @@ -738,19 +696,16 @@ def _dataset_info(self): import pyarrow.parquet as pq dataset = pq.ParquetDataset( - # TODO: This is a different API than above because the - # pa_ds.parquet_dataset only accepts the metadata file. Are - # these APIs here the correct choice? - # Just pass all_files once https://github.com/apache/arrow/pull/40143 is available to reduce latency + # TODO Just pass all_files once + # https://github.com/apache/arrow/pull/40143 is available to + # reduce latency [fi.path for fi in all_files], filesystem=self.fs, filters=self.filters, ) dataset_info["using_metadata_file"] = False dataset_info["fragments"] = dataset.fragments - # Note: We only use the dataset once to generate fragments. The - # fragments themselves can do everything we care about like splitting by - # rowgroups, filtering and reading + dataset_info["dataset"] = dataset dataset_info["schema"] = dataset.schema dataset_info["base_meta"] = dataset.schema.empty_table().to_pandas() @@ -899,6 +854,9 @@ def engine(self): return get_engine(_engine) return _engine + def _divisions(self): + return self._plan["divisions"] + @property def _dataset_info(self): if rv := self.operand("_dataset_info_cache"): @@ -1064,6 +1022,34 @@ def _plan(self): } return _cached_plan[dataset_token] + def _get_lengths(self) -> tuple | None: + """Return known partition lengths using parquet statistics""" + if not self.filters: + self._update_length_statistics() + return tuple( + length + for i, length in enumerate(self._pq_length_stats) + if not self._filtered or i in self._partitions + ) + return None + + def _update_length_statistics(self): + """Ensure that partition-length statistics are up to date""" + + if not self._pq_length_stats: + if self._plan["statistics"]: + # Already have statistics from original API call + self._pq_length_stats = tuple( + stat["num-rows"] + for i, stat in enumerate(self._plan["statistics"]) + if not self._filtered or i in self._partitions + ) + else: + # Need to go back and collect statistics + self._pq_length_stats = tuple( + stat["num-rows"] for stat in _collect_pq_statistics(self) + ) + # # Helper functions From d608d8d8f57aae816e3697e90340abafc23a0c65 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 22 Feb 2024 16:17:08 +0100 Subject: [PATCH 07/12] deal with None blocksize --- dask_expr/_collection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index af7a376c3..a617fefa7 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -4641,7 +4641,7 @@ def read_parquet( raise NotImplementedError( "split_row_groups is not supported when using the pyarrow filesystem." ) - if blocksize != "default": + if blocksize is not None and blocksize != "default": raise NotImplementedError( "blocksize is not supported when using the pyarrow filesystem." ) From 13f2584dd86ab13380f02e26fa63c2f7318ecc35 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 23 Feb 2024 14:17:04 +0100 Subject: [PATCH 08/12] set batch size to very large --- dask_expr/io/parquet.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 8ae677187..05b66d5db 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -749,7 +749,14 @@ def _fragment_to_pandas(fragment, columns, filters, schema): schema=schema, columns=columns, filter=filters, - # batch_size=131072, + # Batch size determines how many rows are read at once and will + # cause the underlying array to be split into chunks of this size + # (max). We'd like to avoid fragmentation as much as possible and + # and to set this to something like inf but we have to set a finite, + # positive number. + # In the presence of row groups, the underlying array will still be + # chunked per rowgroup + batch_size=10_000_000, # batch_readahead=16, # fragment_readahead=4, fragment_scan_options=pa.dataset.ParquetFragmentScanOptions( @@ -763,6 +770,7 @@ def _fragment_to_pandas(fragment, columns, filters, schema): range_size_limit=parse_bytes("32.00 MiB"), # I've seen this actually slowing us down, e.g. on TPCHQ14 lazy=False, + # If we disable lazy we can remove this as well. prefetch_limit=500, ), # thrift_string_size_limit=None, From 3960ee3e30e075f01fce1badc6c5620b36c0dbef Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 26 Feb 2024 15:06:39 +0100 Subject: [PATCH 09/12] remove commented code --- dask_expr/io/parquet.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 05b66d5db..576524c94 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -760,43 +760,19 @@ def _fragment_to_pandas(fragment, columns, filters, schema): # batch_readahead=16, # fragment_readahead=4, fragment_scan_options=pa.dataset.ParquetFragmentScanOptions( - # use_buffered_stream=False, - # buffer_size=8192, pre_buffer=True, cache_options=pa.CacheOptions( - # hole_size_limit=parse_bytes("8 KiB"), - # range_size_limit=parse_bytes("32.00 MiB"), hole_size_limit=parse_bytes("4 MiB"), range_size_limit=parse_bytes("32.00 MiB"), - # I've seen this actually slowing us down, e.g. on TPCHQ14 - lazy=False, - # If we disable lazy we can remove this as well. - prefetch_limit=500, ), - # thrift_string_size_limit=None, - # thrift_container_size_limit=None, - # decryption_config=None, - # page_checksum_verification=False, ), # TODO: Reconsider this. The OMP_NUM_THREAD variable makes it harmful to enable this use_threads=True, ) df = table.to_pandas( types_mapper=_determine_type_mapper(), - # categories=None, - # strings_to_categorical=False, - # zero_copy_only=False, - # integer_object_nulls=False, - # date_as_object=True, - # timestamp_as_object=False, use_threads=False, - # deduplicate_objects=True, - # ignore_metadata=False, - # safe=True, - # split_blocks=False, self_destruct=True, - # maps_as_pydicts=None, - # coerce_temporal_nanoseconds=False, ) return df From d59627160cae8e00365b6a78096ed14551b47a85 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 27 Feb 2024 13:09:07 +0100 Subject: [PATCH 10/12] remove duplicate code --- dask_expr/io/parquet.py | 34 ++-------------------------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 576524c94..edf886565 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -4,6 +4,7 @@ import itertools import operator import warnings +from abc import abstractmethod from collections import defaultdict from functools import cached_property @@ -484,27 +485,6 @@ def default_types_mapper(pyarrow_dtype): class ReadParquet(PartitionsFiltered, BlockwiseIO): - _parameters = [ - "path", - "columns", - "filters", - "categories", - "index", - "storage_options", - "filesystem", - "kwargs", - "_dataset_info_cache", - ] - _defaults = { - "columns": None, - "filters": None, - "categories": None, - "index": None, - "storage_options": None, - "filesystem": None, - "kwargs": None, - "_dataset_info_cache": None, - } _pq_length_stats = None _absorb_projections = True _filter_passthrough = False @@ -589,6 +569,7 @@ def _meta(self): return meta[columns] return meta + @abstractmethod def _divisions(self): raise NotImplementedError @@ -717,17 +698,6 @@ def _dataset_info(self): def _divisions(self): return tuple([None] * (len(self.fragments) + 1)) - @property - def _meta(self): - meta = self._dataset_info["base_meta"] - columns = _convert_to_list(self.operand("columns")) - if self._series: - assert len(columns) > 0 - return meta[columns[0]] - elif columns is not None: - return meta[columns] - return meta - @property def fragments(self): if self.filters is not None: From c3bc0200d485d983fb7969be9292675fd9122939 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 27 Feb 2024 13:11:48 +0100 Subject: [PATCH 11/12] enforce pyarrow 15 --- dask_expr/_collection.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 5326c0d35..f03149bf6 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -12,6 +12,7 @@ import dask.dataframe.methods as methods import numpy as np import pandas as pd +import pyarrow as pa from dask import compute, delayed from dask.array import Array from dask.base import DaskMethodsMixin, is_dask_collection, named_schedulers @@ -57,10 +58,12 @@ ) from dask.widgets import get_template from fsspec.utils import stringify_path +from packaging.version import parse as parse_version from pandas import CategoricalDtype from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_numeric_dtype from pandas.api.types import is_scalar as pd_is_scalar from pandas.api.types import is_timedelta64_dtype +from pyarrow import fs as pa_fs from tlz import first import dask_expr._backends # noqa: F401 @@ -4642,13 +4645,16 @@ def read_parquet( col, op, val = filter if op == "in" and not isinstance(val, (set, list, tuple)): raise TypeError("Value of 'in' filter must be a list, set or tuple.") - from pyarrow import fs as pa_fs if ( isinstance(filesystem, pa_fs.FileSystem) or isinstance(filesystem, str) and filesystem.lower() in ("arrow", "pyarrow") ): + if parse_version(pa.__version__) <= parse_version("15.0.0"): + raise ValueError( + "pyarrow>=15.0.0 is required to use the pyarrow filesystem." + ) if calculate_divisions: raise NotImplementedError( "calculate_divisions is not supported when using the pyarrow filesystem." From 58084cb39714739aa69d0ff05962d7d38c9b4598 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 27 Feb 2024 13:20:44 +0100 Subject: [PATCH 12/12] remove path normalization code --- dask_expr/_collection.py | 2 +- dask_expr/io/parquet.py | 22 +++++----------------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index f03149bf6..2094e7df2 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -4651,7 +4651,7 @@ def read_parquet( or isinstance(filesystem, str) and filesystem.lower() in ("arrow", "pyarrow") ): - if parse_version(pa.__version__) <= parse_version("15.0.0"): + if parse_version(pa.__version__) < parse_version("15.0.0"): raise ValueError( "pyarrow>=15.0.0 is required to use the pyarrow filesystem." ) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index edf886565..a3e0a21ec 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -615,6 +615,10 @@ class ReadParquetPyarrowFS(ReadParquet): _absorb_projections = True _filter_passthrough = True + @cached_property + def normalized_path(self): + return pa_fs.FileSystem.from_uri(self.path)[1] + @cached_property def fs(self): fs_input = self.operand("filesystem") @@ -634,7 +638,7 @@ def _dataset_info(self): return rv dataset_info = {} - path_normalized = _normalize_and_strip_protocol(self.path, self.fs) + path_normalized = self.normalized_path # At this point we will post a couple of listbucket operations which # includes the same data as a HEAD request. # The information included here (see pyarrow FileInfo) are size, type, @@ -1317,19 +1321,3 @@ def _read_partition_stats(part, fs, columns=None): # Helper function used by _extract_statistics return [_read_partition_stats(part, fs, columns=columns) for part in parts] - - -def _normalize_and_strip_protocol(path, fs): - if isinstance(fs, pa.fs.LocalFileSystem): - protos = [] - elif fs.type_name == "s3": - protos = ["s3"] - else: - raise NotImplementedError(f"FileSystem of type {fs.typ_name} unknown") - for protocol in protos: - if path.startswith(protocol + "://"): - path = path[len(protocol) + 3 :] - elif path.startswith(protocol + "::"): - path = path[len(protocol) + 2 :] - path = path.rstrip("/") - return path