From 819b74b89d8c5b1ca3fa5dcce26164b7961ed02f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 20 Dec 2022 16:52:26 +0100 Subject: [PATCH 1/5] Check fo supported dtypes and raise --- distributed/shuffle/_arrow.py | 21 ++++++++++++++++ distributed/shuffle/_shuffle.py | 17 +++++++------ distributed/shuffle/tests/test_graph.py | 33 +++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 7 deletions(-) diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index f7531020274..c7c97a94c2f 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -3,9 +3,30 @@ from typing import TYPE_CHECKING, BinaryIO if TYPE_CHECKING: + import pandas as pd import pyarrow as pa +def check_dtype_support(meta_input: pd.DataFrame) -> None: + import pandas as pd + + for name in meta_input.columns: + column = meta_input[name] + # FIXME: PyArrow does not support complex numbers: https://issues.apache.org/jira/browse/ARROW-638 + if pd.api.types.is_complex_dtype(column.dtype): + raise TypeError( + f"p2p does not support data of type '{column.dtype}' found in column '{name}'." + ) + # FIXME: Serializing custom objects to PyArrow is not supported in P2P shuffling + if pd.api.types.is_object_dtype(column.dtype): + raise TypeError( + f"p2p does not support custom objects found in column '{name}'." + ) + # FIXME: PyArrow does not support sparse data: https://issues.apache.org/jira/browse/ARROW-8679 + if pd.api.types.is_sparse(column): + raise TypeError("p2p does not support sparse data found in column '{name}'") + + def dump_batch(batch: pa.Buffer, file: BinaryIO, schema: pa.Schema) -> None: """ Dump a batch to file, if we're the first, also write the schema diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 7d7f5bf1229..d666afe40af 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -7,6 +7,8 @@ from dask.highlevelgraph import HighLevelGraph from dask.layers import SimpleShuffleLayer +from distributed.shuffle._arrow import check_dtype_support + logger = logging.getLogger("distributed.shuffle") if TYPE_CHECKING: import pandas as pd @@ -79,13 +81,14 @@ def rearrange_by_column_p2p( token = tokenize(df, column, npartitions) empty = df._meta.copy() - for c, dt in empty.dtypes.items(): - if dt == object: - empty[c] = empty[c].astype( - "string" - ) # TODO: we fail at non-string object dtypes - empty[column] = empty[column].astype("int64") # TODO: this shouldn't be necesssary - + # for c, dt in empty.dtypes.items(): + # if dt == object: + # empty[c] = empty[c].astype( + # "string" + # ) # TODO: we fail at non-string object dtypes + # empty[column] = empty[column].astype("int64") # TODO: this shouldn't be necesssary + + check_dtype_support(empty) name = f"shuffle-p2p-{token}" layer = P2PShuffleLayer( name, diff --git a/distributed/shuffle/tests/test_graph.py b/distributed/shuffle/tests/test_graph.py index e7d3f0f77db..668b8384428 100644 --- a/distributed/shuffle/tests/test_graph.py +++ b/distributed/shuffle/tests/test_graph.py @@ -29,9 +29,42 @@ def test_basic(client): # ^ NOTE: this works because `assert_eq` sorts the rows before comparing +@pytest.mark.parametrize("dtype", ["csingle", "cdouble", "clongdouble"]) +def test_raise_on_complex_numbers(dtype): + df = dd.from_pandas( + pd.DataFrame({"x": pd.array(range(10), dtype=dtype)}), npartitions=5 + ) + with pytest.raises( + TypeError, match=f"p2p does not support data of type '{df.x.dtype}'" + ): + df.shuffle("x", shuffle="p2p") + + +def test_raise_on_custom_objects(): + class Stub: + def __init__(self, value: int) -> None: + self.value = value + + df = dd.from_pandas( + pd.DataFrame({"x": pd.array([Stub(i) for i in range(10)], dtype="object")}), + npartitions=5, + ) + with pytest.raises(TypeError, match="p2p does not support custom objects"): + df.shuffle("x", shuffle="p2p") + + +def test_raise_on_sparse_data(): + df = dd.from_pandas( + pd.DataFrame({"x": pd.array(range(10), dtype="Sparse[float64]")}), npartitions=5 + ) + with pytest.raises(TypeError, match="p2p does not support sparse data"): + df.shuffle("x", shuffle="p2p") + + @gen_cluster([("", 2)] * 4, client=True) async def test_basic_state(c, s, *workers): df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") + df["name"] = df["name"].astype("string[python]") shuffled = df.shuffle("id", shuffle="p2p") exts = [w.extensions["shuffle"] for w in workers] From 3e0d0127f5565ab0b70d127a321c0696dee88baf Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 20 Dec 2022 16:54:21 +0100 Subject: [PATCH 2/5] Remove commented-out code --- distributed/shuffle/_shuffle.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index d666afe40af..890462e4ec1 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -81,12 +81,6 @@ def rearrange_by_column_p2p( token = tokenize(df, column, npartitions) empty = df._meta.copy() - # for c, dt in empty.dtypes.items(): - # if dt == object: - # empty[c] = empty[c].astype( - # "string" - # ) # TODO: we fail at non-string object dtypes - # empty[column] = empty[column].astype("int64") # TODO: this shouldn't be necesssary check_dtype_support(empty) name = f"shuffle-p2p-{token}" From 00e174ba2eeed923b1d1c67e852a486a9594f45e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 20 Dec 2022 17:12:55 +0100 Subject: [PATCH 3/5] Simplify --- distributed/shuffle/_arrow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index c7c97a94c2f..ffd0ac9f4f4 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -10,15 +10,15 @@ def check_dtype_support(meta_input: pd.DataFrame) -> None: import pandas as pd - for name in meta_input.columns: + for name in meta_input: column = meta_input[name] # FIXME: PyArrow does not support complex numbers: https://issues.apache.org/jira/browse/ARROW-638 - if pd.api.types.is_complex_dtype(column.dtype): + if pd.api.types.is_complex_dtype(column): raise TypeError( f"p2p does not support data of type '{column.dtype}' found in column '{name}'." ) # FIXME: Serializing custom objects to PyArrow is not supported in P2P shuffling - if pd.api.types.is_object_dtype(column.dtype): + if pd.api.types.is_object_dtype(column): raise TypeError( f"p2p does not support custom objects found in column '{name}'." ) From 23e04dd8ae75301b7c6991bbdafccf25cf4b4ad0 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 20 Dec 2022 17:59:01 +0100 Subject: [PATCH 4/5] Move check --- distributed/shuffle/_shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 890462e4ec1..a71683fe403 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -77,12 +77,12 @@ def rearrange_by_column_p2p( ) -> DataFrame: from dask.dataframe import DataFrame + check_dtype_support(df._meta) npartitions = npartitions or df.npartitions token = tokenize(df, column, npartitions) empty = df._meta.copy() - check_dtype_support(empty) name = f"shuffle-p2p-{token}" layer = P2PShuffleLayer( name, From a68e59e26719a5056deda1962eddfd35444d3b31 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 20 Dec 2022 18:59:04 +0100 Subject: [PATCH 5/5] Fix test --- distributed/dashboard/tests/test_scheduler_bokeh.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index efe71ea7636..b81f2c2ed42 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -1119,6 +1119,7 @@ async def test_shuffling(c, s, a, b): ss = Shuffling(s) df = dask.datasets.timeseries() + df["name"] = df["name"].astype("string[python]") df2 = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist() start = time() while not ss.source.data["disk_read"]: