Skip to content

p2p shuffle - ArrowInvalid error #7400

@ncclementi

Description

@ncclementi

I was trying to run p2p shuffle on one of the h20 queries and I got this error.

MRE:

from dask.distributed import Client
import dask.dataframe as dd

client = Client()


ddf_pq = dd.read_parquet("s3://coiled-datasets/h2o-benchmark/N_1e7_K_1e2_parquet/*.parquet", , storage_options={"anon": True})

ddf = ddf_pq[["id3", "v1", "v3"]]
(
    ddf.groupby("id3")
    .agg({"v1": "sum", "v3": "mean"}, shuffle="p2p")
    .compute()
)

Error:

Exception: ArrowInvalid('IPC stream did not have the expected number (1) of dictionaries at the start of the stream')
Traceback
2022-12-12 17:54:21,150 - distributed.shuffle._shuffle_extension - CRITICAL - Shuffle inputs done <Shuffle id: 4e2a7ffe9a2f5f90f47597a223d636ea on tcp://127.0.0.1:57400>
2022-12-12 17:54:21,150 - distributed.shuffle._shuffle_extension - CRITICAL - Shuffle inputs done <Shuffle id: 4e2a7ffe9a2f5f90f47597a223d636ea on tcp://127.0.0.1:57402>
2022-12-12 17:54:21,162 - distributed.shuffle._shuffle_extension - CRITICAL - Shuffle inputs done <Shuffle id: 4e2a7ffe9a2f5f90f47597a223d636ea on tcp://127.0.0.1:57401>
2022-12-12 17:54:21,177 - distributed.core - ERROR - Exception while handling op shuffle_receive
Traceback (most recent call last):
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 771, in _handle_comm
    result = await result
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 337, in shuffle_receive
    await shuffle.receive(data)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 194, in receive
    await self._receive(data)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 205, in _receive
    data = await self.offload(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 177, in offload
    return await asyncio.get_running_loop().run_in_executor(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_arrow.py", line 62, in list_of_buffers_to_table
    data = sr.read_all()
  File "pyarrow/ipc.pxi", line 691, in pyarrow.lib.RecordBatchReader.read_all
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: IPC stream did not have the expected number (1) of dictionaries at the start of the stream
2022-12-12 17:54:21,268 - distributed.shuffle._comms - ERROR - IPC stream did not have the expected number (1) of dictionaries at the start of the stream
Traceback (most recent call last):
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_comms.py", line 70, in _process
    await self.send(address, [_join_shards(shards)])
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 170, in send
    return await self.rpc(address).shuffle_receive(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 1163, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 953, in send_recv
    raise exc.with_traceback(tb)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 771, in _handle_comm
    result = await result
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 337, in shuffle_receive
    await shuffle.receive(data)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 194, in receive
    await self._receive(data)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 205, in _receive
    data = await self.offload(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 177, in offload
    return await asyncio.get_running_loop().run_in_executor(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_arrow.py", line 62, in list_of_buffers_to_table
    data = sr.read_all()
  File "pyarrow/ipc.pxi", line 691, in pyarrow.lib.RecordBatchReader.read_all
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: IPC stream did not have the expected number (1) of dictionaries at the start of the stream
2022-12-12 17:54:21,270 - distributed.shuffle._shuffle_extension - ERROR - IPC stream did not have the expected number (1) of dictionaries at the start of the stream
Traceback (most recent call last):
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 346, in shuffle_inputs_done
    await shuffle.inputs_done()
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 274, in inputs_done
    self._comm_buffer.raise_on_exception()
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_buffer.py", line 210, in raise_on_exception
    raise self._exception
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_buffer.py", line 103, in process
    await self._process(id, shards)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_comms.py", line 70, in _process
    await self.send(address, [_join_shards(shards)])
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 170, in send
    return await self.rpc(address).shuffle_receive(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 1163, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 953, in send_recv
    raise exc.with_traceback(tb)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 771, in _handle_comm
    result = await result
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 337, in shuffle_receive
    await shuffle.receive(data)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 194, in receive
    await self._receive(data)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 205, in _receive
    data = await self.offload(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 177, in offload
    return await asyncio.get_running_loop().run_in_executor(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_arrow.py", line 62, in list_of_buffers_to_table
    data = sr.read_all()
  File "pyarrow/ipc.pxi", line 691, in pyarrow.lib.RecordBatchReader.read_all
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: IPC stream did not have the expected number (1) of dictionaries at the start of the stream
2022-12-12 17:54:21,270 - distributed.core - ERROR - Exception while handling op shuffle_inputs_done
Traceback (most recent call last):
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 771, in _handle_comm
    result = await result
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 346, in shuffle_inputs_done
    await shuffle.inputs_done()
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 274, in inputs_done
    self._comm_buffer.raise_on_exception()
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_buffer.py", line 210, in raise_on_exception
    raise self._exception
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_buffer.py", line 103, in process
    await self._process(id, shards)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_comms.py", line 70, in _process
    await self.send(address, [_join_shards(shards)])
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 170, in send
    return await self.rpc(address).shuffle_receive(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 1163, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 953, in send_recv
    raise exc.with_traceback(tb)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 771, in _handle_comm
    result = await result
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 337, in shuffle_receive
    await shuffle.receive(data)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 194, in receive
    await self._receive(data)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 205, in _receive
    data = await self.offload(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py", line 177, in offload
    return await asyncio.get_running_loop().run_in_executor(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_arrow.py", line 62, in list_of_buffers_to_table
    data = sr.read_all()
  File "pyarrow/ipc.pxi", line 691, in pyarrow.lib.RecordBatchReader.read_all
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: IPC stream did not have the expected number (1) of dictionaries at the start of the stream
2022-12-12 17:54:21,272 - distributed.scheduler - ERROR - broadcast to tcp://127.0.0.1:57399 failed: Exception: ArrowInvalid('IPC stream did not have the expected number (1) of dictionaries at the start of the stream')
2022-12-12 17:54:21,274 - distributed.core - ERROR - Exception while handling op broadcast
Traceback (most recent call last):
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 771, in _handle_comm
    result = await result
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/scheduler.py", line 5960, in broadcast
    results = await All(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/utils.py", line 237, in All
    result = await tasks.next()
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/scheduler.py", line 5938, in send_message
    resp = await send_recv(
  File "/Users/ncclementi/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py", line 955, in send_recv
    raise Exception(response["exception_text"])
Exception: ArrowInvalid('IPC stream did not have the expected number (1) of dictionaries at the start of the stream')
2022-12-12 17:54:21,282 - distributed.worker - WARNING - Compute Failed
Key:       shuffle-barrier-4e2a7ffe9a2f5f90f47597a223d636ea
Function:  shuffle_barrier
args:      ('4e2a7ffe9a2f5f90f47597a223d636ea', [None])
kwargs:    {}
Exception: 'Exception("ArrowInvalid(\'IPC stream did not have the expected number (1) of dictionaries at the start of the stream\')")'

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
File <timed exec>:5

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
    291 def compute(self, **kwargs):
    292     """Compute this dask collection
    293 
    294     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    313     dask.base.compute
    314     """
--> 315     (result,) = compute(self, traverse=False, **kwargs)
    316     return result

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    597     keys.append(x.__dask_keys__())
    598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
    601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/client.py:3122, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3120         should_rejoin = False
   3121 try:
-> 3122     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3123 finally:
   3124     for f in futures.values():

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/client.py:2291, in Client.gather(self, futures, errors, direct, asynchronous)
   2289 else:
   2290     local_worker = None
-> 2291 return self.sync(
   2292     self._gather,
   2293     futures,
   2294     errors=errors,
   2295     direct=direct,
   2296     local_worker=local_worker,
   2297     asynchronous=asynchronous,
   2298 )

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/utils.py:339, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    337     return future
    338 else:
--> 339     return sync(
    340         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    341     )

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/utils.py:406, in sync(loop, func, callback_timeout, *args, **kwargs)
    404 if error:
    405     typ, exc, tb = error
--> 406     raise exc.with_traceback(tb)
    407 else:
    408     return result

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/utils.py:379, in sync.<locals>.f()
    377         future = asyncio.wait_for(future, callback_timeout)
    378     future = asyncio.ensure_future(future)
--> 379     result = yield future
    380 except Exception:
    381     error = sys.exc_info()

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/client.py:2154, in Client._gather(self, futures, errors, direct, local_worker)
   2152         exc = CancelledError(key)
   2153     else:
-> 2154         raise exception.with_traceback(traceback)
   2155     raise exc
   2156 if errors == "skip":

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle.py:56, in shuffle_barrier()
     55 def shuffle_barrier(id: ShuffleId, transfers: list[None]) -> None:
---> 56     return _get_worker_extension().barrier(id)

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py:467, in barrier()
    466 def barrier(self, shuffle_id: ShuffleId) -> None:
--> 467     sync(self.worker.loop, self._barrier, shuffle_id)

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/utils.py:406, in sync()
    404 if error:
    405     typ, exc, tb = error
--> 406     raise exc.with_traceback(tb)
    407 else:
    408     return result

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/utils.py:379, in f()
    377         future = asyncio.wait_for(future, callback_timeout)
    378     future = asyncio.ensure_future(future)
--> 379     result = yield future
    380 except Exception:
    381     error = sys.exc_info()

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/tornado/gen.py:762, in run()
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py:378, in _barrier()
    375 # Tell all peers that we've reached the barrier
    376 # Note that this will call `shuffle_inputs_done` on our own worker as well
    377 shuffle = await self._get_shuffle(shuffle_id)
--> 378 await shuffle.barrier()

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/shuffle/_shuffle_extension.py:159, in barrier()
    152 async def barrier(self) -> None:
    153     # FIXME: This should restrict communication to only workers
    154     # participating in this specific shuffle. This will not only reduce the
   (...)
    157     # TODO: Consider broadcast pinging once when the shuffle starts to warm
    158     # up the comm pool on scheduler side
--> 159     out = await self.broadcast(
    160         msg={"op": "shuffle_inputs_done", "shuffle_id": self.id}
    161     )
    162     if not self.output_workers.issubset(set(out)):
    163         raise ValueError(
    164             "Some critical workers have left",
    165             set(self.output_workers) - set(out),
    166         )

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py:1163, in send_recv_from_rpc()
   1161 prev_name, comm.name = comm.name, "ConnectionPool." + key
   1162 try:
-> 1163     return await send_recv(comm=comm, op=key, **kwargs)
   1164 finally:
   1165     self.pool.reuse(self.addr, comm)

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py:953, in send_recv()
    951     _, exc, tb = clean_exception(**response)
    952     assert exc
--> 953     raise exc.with_traceback(tb)
    954 else:
    955     raise Exception(response["exception_text"])

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py:771, in _handle_comm()
    769     result = handler(**msg)
    770 if inspect.iscoroutine(result):
--> 771     result = await result
    772 elif inspect.isawaitable(result):
    773     raise RuntimeError(
    774         f"Comm handler returned unknown awaitable. Expected coroutine, instead got {type(result)}"
    775     )

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/scheduler.py:5960, in broadcast()
   5954         else:
   5955             raise ValueError(
   5956                 "on_error must be 'raise', 'return', 'return_pickle', "
   5957                 f"or 'ignore'; got {on_error!r}"
   5958             )
-> 5960 results = await All(
   5961     [send_message(address) for address in addresses if address is not None]
   5962 )
   5964 return {k: v for k, v in zip(workers, results) if v is not ERROR}

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/utils.py:237, in All()
    235 while not tasks.done():
    236     try:
--> 237         result = await tasks.next()
    238     except Exception:
    240         @gen.coroutine
    241         def quiet():

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/scheduler.py:5938, in send_message()
   5936 comm.name = "Scheduler Broadcast"
   5937 try:
-> 5938     resp = await send_recv(
   5939         comm, close=True, serializers=serializers, **msg
   5940     )
   5941 finally:
   5942     self.rpc.reuse(addr, comm)

File ~/mambaforge/envs/dask-tutorial/lib/python3.10/site-packages/distributed/core.py:955, in send_recv()
    953         raise exc.with_traceback(tb)
    954     else:
--> 955         raise Exception(response["exception_text"])
    956 return response

Exception: ArrowInvalid('IPC stream did not have the expected number (1) of dictionaries at the start of the stream')

dask/distribtued version : 2022.11.1 (also tested on main and it fails)
pyarrow version : 10.0.1
pandas: 1.5.1
Installation: conda-forge

note: I did installed things via mamba install coiled-runtime=0.2.1

cc: @jrbourbeau @hendrikmakait

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions