Skip to content

p2p shuffling failed if Arrow infers different dtypes for partitions #7837

@phofl

Description

@phofl

Describe the issue:

The first partition is inferred as null while the second is inferred as string, this clashes when concatenating them

Minimal Complete Verifiable Example:

from distributed import Client
client = Client()

data = [
    {'companies': [], 'id': 'a', "x": None},
    {'companies': [{'id': 3},  {'id': 5}], 'id': 'b', "x": None},
    {'companies': [{'id': 3}, {'id': 4}, {'id': 5}], 'id': 'c', "x": "b"},
    {'companies': [{'id': 9}], 'id': 'a', "x": "a"}
]
df = pd.DataFrame(data)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.shuffle(on='id', shuffle="p2p", ignore_index=True)
2023-05-15 16:08:50,846 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-38d3d79031f25db9add57999b152c9b2', 1)
Function:  shuffle_unpack
args:      ('9d0c74e7ff3dcaba60ce1957cc329424', 1, 0)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 9d0c74e7ff3dcaba60ce1957cc329424')"

Traceback (most recent call last):
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/shuffle/_shuffle.py", line 77, in shuffle_unpack
    return _get_worker_extension().get_output_partition(
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 903, in get_output_partition
    return sync(self.worker.loop, shuffle.get_output_partition, output_partition)
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/utils.py", line 418, in sync
    raise exc.with_traceback(tb)
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/utils.py", line 391, in f
    result = yield future
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 529, in get_output_partition
    out = await self.offload(_)
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 132, in offload
    return await asyncio.get_running_loop().run_in_executor(
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 526, in _
    df = convert_partition(data)
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/shuffle/_arrow.py", line 57, in convert_partition
    return pa.concat_tables(shards)
  File "pyarrow/table.pxi", line 5139, in pyarrow.lib.concat_tables
    c_result_table = GetResultValue(
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
    return check_status(status)
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
    raise ArrowInvalid(message)
pyarrow.lib.ArrowInvalid: Schema at index 1 was different: 
companies: list<item: struct<id: int64>>
id: string
x: null
_partitions: uint64
__index_level_0__: int64
vs
companies: list<item: struct<id: int64>>
id: string
x: string
_partitions: uint64
__index_level_0__: int64

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/patrick/Library/Application Support/JetBrains/PyCharm2023.1/scratches/scratch.py", line 521, in <module>
    ddf.compute()
  File "/Users/patrick/PycharmProjects/dask_dev/dask/dask/base.py", line 314, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/patrick/PycharmProjects/dask_dev/dask/dask/base.py", line 599, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/client.py", line 3224, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/client.py", line 2359, in gather
    return self.sync(
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/utils.py", line 351, in sync
    return sync(
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/utils.py", line 418, in sync
    raise exc.with_traceback(tb)
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/utils.py", line 391, in f
    result = yield future
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/client.py", line 2222, in _gather
    raise exception.with_traceback(traceback)
  File "/Users/patrick/mambaforge/envs/dask-dev/lib/python3.10/site-packages/distributed/shuffle/_shuffle.py", line 81, in shuffle_unpack
    raise RuntimeError(f"shuffle_unpack failed during shuffle {id}") from e
RuntimeError: shuffle_unpack failed during shuffle 9d0c74e7ff3dcaba60ce1957cc329424

Anything else we need to know?:

Found here: https://dask.discourse.group/t/how-to-drop-duplicates-by-string-id-for-a-large-dataframe/1835

cc @hendrikmakait

Environment:

  • Dask version: main
  • Python version: 3.10
  • Operating System: mac
  • Install method (conda, pip, source):

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions