From 32fe84610f2ee1dfdee6a0900cfaa38cf12fe307 Mon Sep 17 00:00:00 2001 From: detroyejr Date: Thu, 15 Jun 2023 11:37:51 -0400 Subject: [PATCH 1/4] Promote null types during concat_tables (#7837). Columns with sparse data can have partitions with empty columns. This creates a conflict of data types between the partitions and an error during concatenation. autosquash! Promote null types during concat_tables (#7837). --- distributed/shuffle/_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index f00bd05caed..a1068fce1f2 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -55,7 +55,7 @@ def convert_partition(data: bytes, meta: pd.DataFrame) -> pd.DataFrame: while file.tell() < end: sr = pa.RecordBatchStreamReader(file) shards.append(sr.read_all()) - table = pa.concat_tables(shards) + table = pa.concat_tables(shards, promote=True) df = table.to_pandas(self_destruct=True) def default_types_mapper(pyarrow_dtype: pa.DataType) -> object: From 7ec002d3dc90a5b41c6a2ab9ca540febd37bb065 Mon Sep 17 00:00:00 2001 From: detroyejr Date: Thu, 15 Jun 2023 19:30:20 -0400 Subject: [PATCH 2/4] Test handling null partitions during p2p shuffle. --- distributed/tests/test_client.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 3cc40f1a0c9..3883c1ffa03 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3211,6 +3211,31 @@ async def test_cancel_clears_processing(c, s, *workers): s.validate_state() +@gen_cluster(client=True) +async def test_handle_null_partitions_p2p_shuffling(c, s, *workers): + has_pyarrow = False + try: + check_minimal_arrow_version() + has_pyarrow = True + except RuntimeError: + pass + + if has_pyarrow: + import pandas as pd + + dd = pytest.importorskip("dask.dataframe") + data = [ + {"companies": [], "id": "a", "x": None}, + {"companies": [{"id": 3}, {"id": 5}], "id": "b", "x": None}, + {"companies": [{"id": 3}, {"id": 4}, {"id": 5}], "id": "c", "x": "b"}, + {"companies": [{"id": 9}], "id": "a", "x": "a"}, + ] + df = pd.DataFrame(data) + ddf = dd.from_pandas(df, npartitions=2) + ddf = ddf.shuffle(on="id", shuffle="p2p", ignore_index=True) + await c.compute(ddf) + + def test_default_get(loop_in_thread): has_pyarrow = False try: From be815d2ee6c57c5e3e51c8ffde24f566677264c7 Mon Sep 17 00:00:00 2001 From: detroyejr Date: Fri, 16 Jun 2023 09:05:19 -0400 Subject: [PATCH 3/4] Move to shuffle/tests and add an assert. Move to shuffle/tests, implement some cleanup based on test_concurrent, and assert that the final data returned by dask DataFrame matches the original pandas DataFrame. --- distributed/shuffle/tests/test_shuffle.py | 19 +++++++++++++++++ distributed/tests/test_client.py | 25 ----------------------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 3192ef149a5..ed4e46735ee 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -1826,3 +1826,22 @@ async def test_closed_worker_returns_before_barrier(c, s): await c.close() await asyncio.gather(*[clean_worker(w) for w in workers]) await clean_scheduler(s) + + +@gen_cluster(client=True) +async def test_handle_null_partitions_p2p_shuffling(c, s, *workers): + data = [ + {"companies": [], "id": "a", "x": None}, + {"companies": [{"id": 3}, {"id": 5}], "id": "b", "x": None}, + {"companies": [{"id": 3}, {"id": 4}, {"id": 5}], "id": "c", "x": "b"}, + {"companies": [{"id": 9}], "id": "a", "x": "a"}, + ] + df = pd.DataFrame(data) + ddf = dd.from_pandas(df, npartitions=2) + ddf = ddf.shuffle(on="id", shuffle="p2p", ignore_index=True) + result = await c.compute(ddf) + assert dask.dataframe.utils.assert_eq(result, df) + + await c.close() + await asyncio.gather(*[clean_worker(w) for w in workers]) + await clean_scheduler(s) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 3883c1ffa03..3cc40f1a0c9 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3211,31 +3211,6 @@ async def test_cancel_clears_processing(c, s, *workers): s.validate_state() -@gen_cluster(client=True) -async def test_handle_null_partitions_p2p_shuffling(c, s, *workers): - has_pyarrow = False - try: - check_minimal_arrow_version() - has_pyarrow = True - except RuntimeError: - pass - - if has_pyarrow: - import pandas as pd - - dd = pytest.importorskip("dask.dataframe") - data = [ - {"companies": [], "id": "a", "x": None}, - {"companies": [{"id": 3}, {"id": 5}], "id": "b", "x": None}, - {"companies": [{"id": 3}, {"id": 4}, {"id": 5}], "id": "c", "x": "b"}, - {"companies": [{"id": 9}], "id": "a", "x": "a"}, - ] - df = pd.DataFrame(data) - ddf = dd.from_pandas(df, npartitions=2) - ddf = ddf.shuffle(on="id", shuffle="p2p", ignore_index=True) - await c.compute(ddf) - - def test_default_get(loop_in_thread): has_pyarrow = False try: From 3635803a453041d01583df021a5492c64f2dc245 Mon Sep 17 00:00:00 2001 From: Jonathan De Troye Date: Tue, 20 Jun 2023 10:47:28 -0400 Subject: [PATCH 4/4] Reuse imported dask.datafame and drop assert. Co-authored-by: Hendrik Makait --- distributed/shuffle/tests/test_shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index ed4e46735ee..66382babddc 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -1840,7 +1840,7 @@ async def test_handle_null_partitions_p2p_shuffling(c, s, *workers): ddf = dd.from_pandas(df, npartitions=2) ddf = ddf.shuffle(on="id", shuffle="p2p", ignore_index=True) result = await c.compute(ddf) - assert dask.dataframe.utils.assert_eq(result, df) + dd.assert_eq(result, df) await c.close() await asyncio.gather(*[clean_worker(w) for w in workers])