Working with Xarray, I need to load a series of dataset lazily (with a delayed approach); at a certain point of the flow, this needs to be combined in a single XR.Dataset through an XR.combine_nested(). This step requires computing the delayed objects. To better follow the process I'm using Distributed locally.
Up to v. 2021.2.0, everything went smoothly, but from v. 2021.3.0 (tested as well over 2021.3.1 2021.4.0 ), I get the error below.
If I roll back to 2021.2.0, everything gets back to normal. I would exclude a problem in the dataset or within the script. I also tested the decrease of chunks sizes without any improvement.
To exclude other sources of the problem, I tested against a different version of python (3.8 and 3.9). I've got the same result, no problem on the 2021.2 and the same error on the 2021.3.0 - 2021.3.1 and 2021.4.0.
Unfortunately, even if I tried, I haven't been able to recreate the error in an MCVE; maybe I oversimplify the dataset (still trying). Most probably, as I don't really know why this problem is popping up, I'm unable to create a proper example.
Any idea from where it can come from? To me seems that the package between the scheduler and the worker is too big, but I've no idea about a new threshold between 2021.2 and the 2021.3
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\protocol\core.py", line 107, in loads
return msgpack.loads(
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 2535607755 exceeds max_bin_len(2147483647)
distributed.core - ERROR - 2535607755 exceeds max_bin_len(2147483647)
Traceback (most recent call last):
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\core.py", line 555, in handle_stream
msgs = await comm.read()
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\tcp.py", line 218, in read
msg = await from_frames(
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\utils.py", line 79, in from_frames
res = _from_frames()
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\utils.py", line 62, in _from_frames
return protocol.loads(
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\protocol\core.py", line 107, in loads
return msgpack.loads(
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 2535607755 exceeds max_bin_len(2147483647)
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\core.py", line 501, in handle_comm
result = await result
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\scheduler.py", line 4718, in add_client
await self.handle_stream(comm=comm, extra={"client": client})
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\core.py", line 555, in handle_stream
msgs = await comm.read()
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\tcp.py", line 218, in read
msg = await from_frames(
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\utils.py", line 79, in from_frames
res = _from_frames()
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\utils.py", line 62, in _from_frames
return protocol.loads(
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\protocol\core.py", line 107, in loads
return msgpack.loads(
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 2535607755 exceeds max_bin_len(2147483647)
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x000001A84C410700>, <Task finished name='Task-51' coro=<BaseTCPListener._handle_stream() done, defined at C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\tcp.py:476> exception=ValueError('2535607755 exceeds max_bin_len(2147483647)')>)
Traceback (most recent call last):
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
ret = callback()
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\tornado\tcpserver.py", line 331, in <lambda>
gen.convert_yielded(future), lambda f: f.result()
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\tcp.py", line 493, in _handle_stream
await self.comm_handler(comm)
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\core.py", line 501, in handle_comm
result = await result
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\scheduler.py", line 4718, in add_client
await self.handle_stream(comm=comm, extra={"client": client})
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\core.py", line 555, in handle_stream
msgs = await comm.read()
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\tcp.py", line 218, in read
msg = await from_frames(
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\utils.py", line 79, in from_frames
res = _from_frames()
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\comm\utils.py", line 62, in _from_frames
return protocol.loads(
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\protocol\core.py", line 107, in loads
return msgpack.loads(
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 2535607755 exceeds max_bin_len(2147483647)
Traceback (most recent call last):
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\IPython\core\interactiveshell.py", line 3437, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-1-97c88c7c0da0>", line 1, in <module>
dask.compute(da_vegetation)
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\dask\base.py", line 566, in compute
results = schedule(dsk, keys, **kwargs)
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\client.py", line 2666, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\client.py", line 1975, in gather
return self.sync(
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\client.py", line 843, in sync
return sync(
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\utils.py", line 353, in sync
raise exc.with_traceback(tb)
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\utils.py", line 336, in f
result[0] = yield future
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\tornado\gen.py", line 762, in run
value = future.result()
File "C:\Users\Pier\Anaconda3\envs\trenove\lib\site-packages\distributed\client.py", line 1841, in _gather
raise exc
concurrent.futures._base.CancelledError: ('rechunk-merge-8508014ddd8426bfbaea85f4694f8042', 0, 6, 2)```
Working with Xarray, I need to load a series of dataset lazily (with a delayed approach); at a certain point of the flow, this needs to be combined in a single XR.Dataset through an XR.combine_nested(). This step requires computing the delayed objects. To better follow the process I'm using Distributed locally.
Up to v. 2021.2.0, everything went smoothly, but from v. 2021.3.0 (tested as well over 2021.3.1 2021.4.0 ), I get the error below.
If I roll back to 2021.2.0, everything gets back to normal. I would exclude a problem in the dataset or within the script. I also tested the decrease of chunks sizes without any improvement.
To exclude other sources of the problem, I tested against a different version of python (3.8 and 3.9). I've got the same result, no problem on the 2021.2 and the same error on the 2021.3.0 - 2021.3.1 and 2021.4.0.
Unfortunately, even if I tried, I haven't been able to recreate the error in an MCVE; maybe I oversimplify the dataset (still trying). Most probably, as I don't really know why this problem is popping up, I'm unable to create a proper example.
Any idea from where it can come from? To me seems that the package between the scheduler and the worker is too big, but I've no idea about a new threshold between 2021.2 and the 2021.3