diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index e6e9c2fdeb0..ef171425ead 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -104,9 +104,15 @@ def rearrange_by_column_p2p( column: str, npartitions: int | None = None, ) -> DataFrame: + import pandas as pd + from dask.dataframe.core import new_dd_object meta = df._meta + if not pd.api.types.is_integer_dtype(meta[column].dtype): + raise TypeError( + f"Expected meta {column=} to be an integer column, is {meta[column].dtype}." + ) check_dtype_support(meta) npartitions = npartitions or df.npartitions token = tokenize(df, column, npartitions) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index fc78ef1fda1..5284d034ff8 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2165,3 +2165,45 @@ async def test_set_index_p2p(c, s, *workers): await c.close() await asyncio.gather(*[check_worker_cleanup(w) for w in workers]) await check_scheduler_cleanup(s) + + +def test_shuffle_p2p_with_existing_index(client): + df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) + ddf = dd.from_pandas( + df, + npartitions=4, + ) + ddf = ddf.shuffle("a", shuffle="p2p") + result = client.compute(ddf, sync=True) + dd.assert_eq(result, df) + + +def test_set_index_p2p_with_existing_index(client): + df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) + ddf = dd.from_pandas( + df, + npartitions=4, + ) + ddf = ddf.set_index("a", shuffle="p2p") + result = client.compute(ddf, sync=True) + dd.assert_eq(result, df.set_index("a")) + + +def test_sort_values_p2p_with_existing_divisions(client): + "Regression test for #8165" + df = pd.DataFrame( + {"a": np.random.randint(0, 3, 20), "b": np.random.randint(0, 3, 20)} + ) + ddf = dd.from_pandas( + df, + npartitions=4, + ) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + ddf = ddf.set_index("a").sort_values("b") + result = client.compute(ddf, sync=True) + dd.assert_eq( + result, + df.set_index("a").sort_values("b"), + check_index=False, + sort_results=False, + )