diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index fc78ef1fda1..cac4f43ab84 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2165,3 +2165,48 @@ async def test_set_index_p2p(c, s, *workers): await c.close() await asyncio.gather(*[check_worker_cleanup(w) for w in workers]) await check_scheduler_cleanup(s) + + +def test_shuffle_p2p_with_existing_index(): + df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) + ddf = dd.from_pandas( + df, + npartitions=4, + ) + with Client() as c: + ddf = ddf.shuffle("a", shuffle="p2p") + result = c.compute(ddf, sync=True) + dd.assert_eq(result, df) + + +def test_set_index_p2p_with_existing_index(): + df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) + ddf = dd.from_pandas( + df, + npartitions=4, + ) + with Client() as c: + ddf = ddf.set_index("a", shuffle="p2p") + result = c.compute(ddf, sync=True) + dd.assert_eq(result, df.set_index("a")) + + +def test_sort_values_p2p_with_existing_divisions(): + "Regression test for #8165" + df = pd.DataFrame( + {"a": np.random.randint(0, 3, 20), "b": np.random.randint(0, 3, 20)} + ) + ddf = dd.from_pandas( + df, + npartitions=4, + ) + with Client() as c: + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + ddf = ddf.set_index("a").sort_values("b") + result = c.compute(ddf, sync=True) + dd.assert_eq( + result, + df.set_index("a").sort_values("b"), + check_index=False, + sort_results=False, + )