diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index f00bd05caed..a1068fce1f2 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -55,7 +55,7 @@ def convert_partition(data: bytes, meta: pd.DataFrame) -> pd.DataFrame: while file.tell() < end: sr = pa.RecordBatchStreamReader(file) shards.append(sr.read_all()) - table = pa.concat_tables(shards) + table = pa.concat_tables(shards, promote=True) df = table.to_pandas(self_destruct=True) def default_types_mapper(pyarrow_dtype: pa.DataType) -> object: diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 3192ef149a5..66382babddc 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -1826,3 +1826,22 @@ async def test_closed_worker_returns_before_barrier(c, s): await c.close() await asyncio.gather(*[clean_worker(w) for w in workers]) await clean_scheduler(s) + + +@gen_cluster(client=True) +async def test_handle_null_partitions_p2p_shuffling(c, s, *workers): + data = [ + {"companies": [], "id": "a", "x": None}, + {"companies": [{"id": 3}, {"id": 5}], "id": "b", "x": None}, + {"companies": [{"id": 3}, {"id": 4}, {"id": 5}], "id": "c", "x": "b"}, + {"companies": [{"id": 9}], "id": "a", "x": "a"}, + ] + df = pd.DataFrame(data) + ddf = dd.from_pandas(df, npartitions=2) + ddf = ddf.shuffle(on="id", shuffle="p2p", ignore_index=True) + result = await c.compute(ddf) + dd.assert_eq(result, df) + + await c.close() + await asyncio.gather(*[clean_worker(w) for w in workers]) + await clean_scheduler(s)