diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 80b0ba29166..909b122c062 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -1118,6 +1118,7 @@ async def test_shuffling(c, s, a, b): ss = Shuffling(s) df = dask.datasets.timeseries() + df["name"] = df["name"].astype("string[python]") df2 = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist() start = time() while not ss.source.data["disk_read"]: diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index f0dc722ece3..bf1aae981ba 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -4,9 +4,30 @@ from typing import TYPE_CHECKING, BinaryIO if TYPE_CHECKING: + import pandas as pd import pyarrow as pa +def check_dtype_support(meta_input: pd.DataFrame) -> None: + import pandas as pd + + for name in meta_input: + column = meta_input[name] + # FIXME: PyArrow does not support complex numbers: https://issues.apache.org/jira/browse/ARROW-638 + if pd.api.types.is_complex_dtype(column): + raise TypeError( + f"p2p does not support data of type '{column.dtype}' found in column '{name}'." + ) + # FIXME: Serializing custom objects to PyArrow is not supported in P2P shuffling + if pd.api.types.is_object_dtype(column): + raise TypeError( + f"p2p does not support custom objects found in column '{name}'." + ) + # FIXME: PyArrow does not support sparse data: https://issues.apache.org/jira/browse/ARROW-8679 + if pd.api.types.is_sparse(column): + raise TypeError("p2p does not support sparse data found in column '{name}'") + + def dump_shards(shards: list[bytes], file: BinaryIO) -> None: """ Write multiple shard tables to the file diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 8aa0adf7072..b1eee5bc052 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -7,6 +7,8 @@ from dask.highlevelgraph import HighLevelGraph from dask.layers import SimpleShuffleLayer +from distributed.shuffle._arrow import check_dtype_support + logger = logging.getLogger("distributed.shuffle") if TYPE_CHECKING: import pandas as pd @@ -94,6 +96,7 @@ def rearrange_by_column_p2p( ) -> DataFrame: from dask.dataframe import DataFrame + check_dtype_support(df._meta) npartitions = npartitions or df.npartitions token = tokenize(df, column, npartitions) @@ -103,12 +106,6 @@ def rearrange_by_column_p2p( raise TypeError( f"p2p requires all column names to be str, found: {unsupported}", ) - for c, dt in empty.dtypes.items(): - if dt == object: - empty[c] = empty[c].astype( - "string" - ) # TODO: we fail at non-string object dtypes - empty[column] = empty[column].astype("int64") # TODO: this shouldn't be necesssary name = f"shuffle-p2p-{token}" layer = P2PShuffleLayer( diff --git a/distributed/shuffle/tests/test_graph.py b/distributed/shuffle/tests/test_graph.py index 4f2d6add1fd..6f1299282f9 100644 --- a/distributed/shuffle/tests/test_graph.py +++ b/distributed/shuffle/tests/test_graph.py @@ -29,6 +29,38 @@ def test_basic(client): # ^ NOTE: this works because `assert_eq` sorts the rows before comparing +@pytest.mark.parametrize("dtype", ["csingle", "cdouble", "clongdouble"]) +def test_raise_on_complex_numbers(dtype): + df = dd.from_pandas( + pd.DataFrame({"x": pd.array(range(10), dtype=dtype)}), npartitions=5 + ) + with pytest.raises( + TypeError, match=f"p2p does not support data of type '{df.x.dtype}'" + ): + df.shuffle("x", shuffle="p2p") + + +def test_raise_on_custom_objects(): + class Stub: + def __init__(self, value: int) -> None: + self.value = value + + df = dd.from_pandas( + pd.DataFrame({"x": pd.array([Stub(i) for i in range(10)], dtype="object")}), + npartitions=5, + ) + with pytest.raises(TypeError, match="p2p does not support custom objects"): + df.shuffle("x", shuffle="p2p") + + +def test_raise_on_sparse_data(): + df = dd.from_pandas( + pd.DataFrame({"x": pd.array(range(10), dtype="Sparse[float64]")}), npartitions=5 + ) + with pytest.raises(TypeError, match="p2p does not support sparse data"): + df.shuffle("x", shuffle="p2p") + + def test_raise_on_non_string_column_name(): df = dd.from_pandas(pd.DataFrame({"a": range(10), 1: range(10)}), npartitions=5) with pytest.raises(TypeError, match="p2p requires all column names to be str"): @@ -43,6 +75,7 @@ def test_does_not_raise_on_stringified_numeric_column_name(): @gen_cluster([("", 2)] * 4, client=True) async def test_basic_state(c, s, *workers): df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") + df["name"] = df["name"].astype("string[python]") shuffled = df.shuffle("id", shuffle="p2p") exts = [w.extensions["shuffle"] for w in workers]