From 08baced64ea1030ad30e00458369cb2fd024ca23 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 12:21:09 +0200 Subject: [PATCH 01/10] Add tests --- distributed/shuffle/tests/test_shuffle.py | 45 +++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index fc78ef1fda1..cac4f43ab84 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2165,3 +2165,48 @@ async def test_set_index_p2p(c, s, *workers): await c.close() await asyncio.gather(*[check_worker_cleanup(w) for w in workers]) await check_scheduler_cleanup(s) + + +def test_shuffle_p2p_with_existing_index(): + df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) + ddf = dd.from_pandas( + df, + npartitions=4, + ) + with Client() as c: + ddf = ddf.shuffle("a", shuffle="p2p") + result = c.compute(ddf, sync=True) + dd.assert_eq(result, df) + + +def test_set_index_p2p_with_existing_index(): + df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) + ddf = dd.from_pandas( + df, + npartitions=4, + ) + with Client() as c: + ddf = ddf.set_index("a", shuffle="p2p") + result = c.compute(ddf, sync=True) + dd.assert_eq(result, df.set_index("a")) + + +def test_sort_values_p2p_with_existing_divisions(): + "Regression test for #8165" + df = pd.DataFrame( + {"a": np.random.randint(0, 3, 20), "b": np.random.randint(0, 3, 20)} + ) + ddf = dd.from_pandas( + df, + npartitions=4, + ) + with Client() as c: + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + ddf = ddf.set_index("a").sort_values("b") + result = c.compute(ddf, sync=True) + dd.assert_eq( + result, + df.set_index("a").sort_values("b"), + check_index=False, + sort_results=False, + ) From 402593823d5f805054d5a61a9660384945386f20 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 12:28:30 +0200 Subject: [PATCH 02/10] Raise early if colum is not int --- distributed/shuffle/_shuffle.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index e6e9c2fdeb0..a3f01a35996 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -104,9 +104,15 @@ def rearrange_by_column_p2p( column: str, npartitions: int | None = None, ) -> DataFrame: + import pandas as pd + from dask.dataframe.core import new_dd_object meta = df._meta + if not pd.api.types.is_integer_dtype(meta[column]): + raise TypeError( + f"Expected meta {column=} to be an integer column, is {meta[column].dtype}." + ) check_dtype_support(meta) npartitions = npartitions or df.npartitions token = tokenize(df, column, npartitions) From 6b8c98c277f7897a44ae9a2d020708dc31f0418b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 12:32:58 +0200 Subject: [PATCH 03/10] update tests --- distributed/shuffle/tests/test_shuffle.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index cac4f43ab84..a671d68245b 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2186,9 +2186,8 @@ def test_set_index_p2p_with_existing_index(): npartitions=4, ) with Client() as c: - ddf = ddf.set_index("a", shuffle="p2p") - result = c.compute(ddf, sync=True) - dd.assert_eq(result, df.set_index("a")) + with pytest.raises(TypeError, match="_partitions.*integer"): + ddf.set_index("a", shuffle="p2p") def test_sort_values_p2p_with_existing_divisions(): @@ -2202,11 +2201,5 @@ def test_sort_values_p2p_with_existing_divisions(): ) with Client() as c: with dask.config.set({"dataframe.shuffle.method": "p2p"}): - ddf = ddf.set_index("a").sort_values("b") - result = c.compute(ddf, sync=True) - dd.assert_eq( - result, - df.set_index("a").sort_values("b"), - check_index=False, - sort_results=False, - ) + with pytest.raises(TypeError, match="_partitions.*integer"): + ddf = ddf.set_index("a").sort_values("b") From f65eae950b016646d6185e22b18ba61b6b409f1b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 13:51:19 +0200 Subject: [PATCH 04/10] Update --- distributed/shuffle/tests/test_shuffle.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index a671d68245b..522eefdaa7f 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2167,30 +2167,34 @@ async def test_set_index_p2p(c, s, *workers): await check_scheduler_cleanup(s) -def test_shuffle_p2p_with_existing_index(): +def test_shuffle_p2p_with_existing_index(loop): df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) ddf = dd.from_pandas( df, npartitions=4, ) - with Client() as c: + with LocalCluster( + n_workers=2, dashboard_address=":0", loop=loop + ) as cluster, Client(cluster) as c: ddf = ddf.shuffle("a", shuffle="p2p") result = c.compute(ddf, sync=True) dd.assert_eq(result, df) -def test_set_index_p2p_with_existing_index(): +def test_set_index_p2p_with_existing_index(loop): df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) ddf = dd.from_pandas( df, npartitions=4, ) - with Client() as c: + with LocalCluster( + n_workers=2, dashboard_address=":0", loop=loop + ) as cluster, Client(cluster) as c: with pytest.raises(TypeError, match="_partitions.*integer"): ddf.set_index("a", shuffle="p2p") -def test_sort_values_p2p_with_existing_divisions(): +def test_sort_values_p2p_with_existing_divisions(loop): "Regression test for #8165" df = pd.DataFrame( {"a": np.random.randint(0, 3, 20), "b": np.random.randint(0, 3, 20)} @@ -2199,7 +2203,9 @@ def test_sort_values_p2p_with_existing_divisions(): df, npartitions=4, ) - with Client() as c: + with LocalCluster( + n_workers=2, dashboard_address=":0", loop=loop + ) as cluster, Client(cluster) as c: with dask.config.set({"dataframe.shuffle.method": "p2p"}): with pytest.raises(TypeError, match="_partitions.*integer"): ddf = ddf.set_index("a").sort_values("b") From 30143636b60f5102950b5ac24e4c77852b3e7071 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 13:58:10 +0200 Subject: [PATCH 05/10] Update tests --- distributed/shuffle/tests/test_shuffle.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 522eefdaa7f..3c733249837 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2190,8 +2190,9 @@ def test_set_index_p2p_with_existing_index(loop): with LocalCluster( n_workers=2, dashboard_address=":0", loop=loop ) as cluster, Client(cluster) as c: - with pytest.raises(TypeError, match="_partitions.*integer"): - ddf.set_index("a", shuffle="p2p") + ddf.set_index("a", shuffle="p2p") + result = c.compute(ddf, sync=True) + dd.assert_eq(result, df) def test_sort_values_p2p_with_existing_divisions(loop): @@ -2207,5 +2208,11 @@ def test_sort_values_p2p_with_existing_divisions(loop): n_workers=2, dashboard_address=":0", loop=loop ) as cluster, Client(cluster) as c: with dask.config.set({"dataframe.shuffle.method": "p2p"}): - with pytest.raises(TypeError, match="_partitions.*integer"): - ddf = ddf.set_index("a").sort_values("b") + ddf = ddf.set_index("a").sort_values("b") + result = c.compute(ddf, sync=True) + dd.assert_eq( + result, + df.set_index("a").sort_values("b"), + check_index=False, + sort_results=False, + ) From a1d75e1baaa4993583fb3fc59185ea552de4536a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 13:59:48 +0200 Subject: [PATCH 06/10] [skip-caching] From 068035f6746bbce43baad1cb1c719264c17a5c92 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 15:52:12 +0200 Subject: [PATCH 07/10] Fix test --- distributed/shuffle/tests/test_shuffle.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 3c733249837..01eaf7243d4 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2190,9 +2190,9 @@ def test_set_index_p2p_with_existing_index(loop): with LocalCluster( n_workers=2, dashboard_address=":0", loop=loop ) as cluster, Client(cluster) as c: - ddf.set_index("a", shuffle="p2p") + ddf = ddf.set_index("a", shuffle="p2p") result = c.compute(ddf, sync=True) - dd.assert_eq(result, df) + dd.assert_eq(result, df.set_index("a")) def test_sort_values_p2p_with_existing_divisions(loop): From 14af0a4cbf564d66fc7456a0d41d9903b78fba68 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 15:52:38 +0200 Subject: [PATCH 08/10] Update distributed/shuffle/_shuffle.py Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> --- distributed/shuffle/_shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index a3f01a35996..ef171425ead 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -109,7 +109,7 @@ def rearrange_by_column_p2p( from dask.dataframe.core import new_dd_object meta = df._meta - if not pd.api.types.is_integer_dtype(meta[column]): + if not pd.api.types.is_integer_dtype(meta[column].dtype): raise TypeError( f"Expected meta {column=} to be an integer column, is {meta[column].dtype}." ) From f09bfb6ea5c7401d8af44a65f4708b7c0ef05d14 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 16:21:33 +0200 Subject: [PATCH 09/10] test cleanup --- distributed/shuffle/tests/test_shuffle.py | 45 +++++++++-------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 01eaf7243d4..5284d034ff8 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2167,35 +2167,29 @@ async def test_set_index_p2p(c, s, *workers): await check_scheduler_cleanup(s) -def test_shuffle_p2p_with_existing_index(loop): +def test_shuffle_p2p_with_existing_index(client): df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) ddf = dd.from_pandas( df, npartitions=4, ) - with LocalCluster( - n_workers=2, dashboard_address=":0", loop=loop - ) as cluster, Client(cluster) as c: - ddf = ddf.shuffle("a", shuffle="p2p") - result = c.compute(ddf, sync=True) - dd.assert_eq(result, df) + ddf = ddf.shuffle("a", shuffle="p2p") + result = client.compute(ddf, sync=True) + dd.assert_eq(result, df) -def test_set_index_p2p_with_existing_index(loop): +def test_set_index_p2p_with_existing_index(client): df = pd.DataFrame({"a": np.random.randint(0, 3, 20)}, index=np.random.random(20)) ddf = dd.from_pandas( df, npartitions=4, ) - with LocalCluster( - n_workers=2, dashboard_address=":0", loop=loop - ) as cluster, Client(cluster) as c: - ddf = ddf.set_index("a", shuffle="p2p") - result = c.compute(ddf, sync=True) - dd.assert_eq(result, df.set_index("a")) + ddf = ddf.set_index("a", shuffle="p2p") + result = client.compute(ddf, sync=True) + dd.assert_eq(result, df.set_index("a")) -def test_sort_values_p2p_with_existing_divisions(loop): +def test_sort_values_p2p_with_existing_divisions(client): "Regression test for #8165" df = pd.DataFrame( {"a": np.random.randint(0, 3, 20), "b": np.random.randint(0, 3, 20)} @@ -2204,15 +2198,12 @@ def test_sort_values_p2p_with_existing_divisions(loop): df, npartitions=4, ) - with LocalCluster( - n_workers=2, dashboard_address=":0", loop=loop - ) as cluster, Client(cluster) as c: - with dask.config.set({"dataframe.shuffle.method": "p2p"}): - ddf = ddf.set_index("a").sort_values("b") - result = c.compute(ddf, sync=True) - dd.assert_eq( - result, - df.set_index("a").sort_values("b"), - check_index=False, - sort_results=False, - ) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + ddf = ddf.set_index("a").sort_values("b") + result = client.compute(ddf, sync=True) + dd.assert_eq( + result, + df.set_index("a").sort_values("b"), + check_index=False, + sort_results=False, + ) From 6d3a76a04166bcffbc809e92d425aae6be529d95 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Sep 2023 16:53:07 +0200 Subject: [PATCH 10/10] [skip-caching]