Skip to content
64 changes: 62 additions & 2 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import dask.dataframe.methods as methods
import numpy as np
import pandas as pd
import pyarrow as pa
from dask import compute, delayed
from dask.array import Array
from dask.base import DaskMethodsMixin, is_dask_collection, named_schedulers
Expand Down Expand Up @@ -57,10 +58,12 @@
)
from dask.widgets import get_template
from fsspec.utils import stringify_path
from packaging.version import parse as parse_version
from pandas import CategoricalDtype
from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_numeric_dtype
from pandas.api.types import is_scalar as pd_is_scalar
from pandas.api.types import is_timedelta64_dtype
from pyarrow import fs as pa_fs
from tlz import first

import dask_expr._backends # noqa: F401
Expand Down Expand Up @@ -4626,7 +4629,11 @@ def read_parquet(
engine=None,
**kwargs,
):
from dask_expr.io.parquet import ReadParquet, _set_parquet_engine
from dask_expr.io.parquet import (
ReadParquetFSSpec,
ReadParquetPyarrowFS,
_set_parquet_engine,
)

if not isinstance(path, str):
path = stringify_path(path)
Expand All @@ -4639,8 +4646,61 @@ def read_parquet(
if op == "in" and not isinstance(val, (set, list, tuple)):
raise TypeError("Value of 'in' filter must be a list, set or tuple.")

if (
isinstance(filesystem, pa_fs.FileSystem)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to make those configs raise or should we rather fall back to the old implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old implementation is currently pretty broken so I don't want to fall back, see #856

or isinstance(filesystem, str)
and filesystem.lower() in ("arrow", "pyarrow")
):
if parse_version(pa.__version__) < parse_version("15.0.0"):
raise ValueError(
"pyarrow>=15.0.0 is required to use the pyarrow filesystem."
)
if calculate_divisions:
raise NotImplementedError(
"calculate_divisions is not supported when using the pyarrow filesystem."
)
if metadata_task_size is not None:
raise NotImplementedError(
"metadata_task_size is not supported when using the pyarrow filesystem."
)
if split_row_groups != "infer":
raise NotImplementedError(
"split_row_groups is not supported when using the pyarrow filesystem."
)
if blocksize is not None and blocksize != "default":
raise NotImplementedError(
"blocksize is not supported when using the pyarrow filesystem."
)
if aggregate_files is not None:
raise NotImplementedError(
"aggregate_files is not supported when using the pyarrow filesystem."
)
if parquet_file_extension != (".parq", ".parquet", ".pq"):
raise NotImplementedError(
"parquet_file_extension is not supported when using the pyarrow filesystem."
)
if engine is not None:
raise NotImplementedError(
"engine is not supported when using the pyarrow filesystem."
)

return new_collection(
ReadParquetPyarrowFS(
path,
columns=_convert_to_list(columns),
filters=filters,
categories=categories,
index=index,
storage_options=storage_options,
filesystem=filesystem,
ignore_metadata_file=ignore_metadata_file,
kwargs=kwargs,
_series=isinstance(columns, str),
)
)

return new_collection(
ReadParquet(
ReadParquetFSSpec(
path,
columns=_convert_to_list(columns),
filters=filters,
Expand Down
Loading