Describe the issue:
The issue is originally described here: dask/dask-cloudprovider#423. But it does not seem like this is GCP specific issue, because the problematic message is from distributed.core, so also reporting here. Although, I did not have a chance to test it with other providers.
When logging level is set to DEBUG, which is the default one (?), then shuffling with shuffle_method="p2p" leads to workers silently dying.
Minimal Complete Verifiable Example:
import dask
import dask.dataframe as dd
from dask.distributed import Client
from dask_cloudprovider.gcp import GCPCluster
@dask.delayed
def generate_df(n_rows):
import pandas as pd
from numpy.random import rand, randint
return pd.DataFrame({"id": randint(1, 100, n_rows), "value": rand(n_rows)})
with GCPCluster(zone="europe-west4-a", n_workers=2) as cluster:
with Client(cluster) as client:
# Generate a data frame with 2 columns
ddf = dd.from_delayed([generate_df(1_000_000) for _ in range(8)])
# "p2p" shuffling fails (this is also the default shuffle method):
n_rows = ddf.shuffle(on="id", shuffle_method="p2p").shape[0].compute()
# This does not throw an error, but workers hang, i.e.:
# - no heartbeat
# - no updates on the dashboard
# - no new logs after a huge binary dump
# and then silently die. I.e., the line below is never reached.
print("Row count after p2p shuffle: ", n_rows)
Leads to a log message on the worker that looks something like
2024-01-15 12:33:03,542 - distributed.core - DEBUG - Message from 'tls://10.164.15.225:33604': {'op': 'shuffle_receive', 'data': [(1, b'\xff\xff\xff\xff\xa0\x04\x00\x00\x10\x00\x00\x00\x00\x00\n\x00\x0e\x00\x06\x00\x05\x00\x08\x00\n\x00\x00\x00\x00\x01\x04\x00\x10\x00\x00\x00\x00\x00\n\x00\x0c\x00\x00\x00\x04\x00\x08\x00\n\x00\x00\x00x\x03\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x0
c\x00\x00\x00\x08\x00\x0c\x00\x04\x00\x08\x00\x08\x00\x00\x00\x08\x00\x00\x00\x10\x00\x00\x00\x06\x00\x00\x00pandas\x00\x00C\x03
\x00\x00{"index_columns": ["__index_level_0__"], "column_indexes": [{"name": null, "field_name": null, "pandas_type": "unicode",
"numpy_type": "object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "id", "field_name": "id", "pandas_type": "int
64", "numpy_type": "int64", "metadata": null}, {"name": "value", "field_name": "value", "pandas_type": "float64", "numpy_type":
"float64", "metadata": null}, {"name": "_partitions", "field_name": "_partitions", "pandas_type": "uint64", "numpy_type": "uint6
4", "metadata": null}, {"name": "_worker", "field_name": "_worker", "pandas_type": "int8", "numpy_type": "int8", "metadata": nul
l}, {"name": null, "field_name": "__index_level_0__", "pandas_type": "int64", "numpy_type": "int64", "metadata": null}], "creato
r": {"library": "pyarrow", "version": "14.0.2"}, "pandas_version": "2.1.4"}\x00\x04\x00\x00\x00\xc4\x00\x00\x00\x80\x00\x00\x00D
\x00\x00\x00\x04\x00\x00\x00\\\xff\xff\xff\x00\x00\x01\x02\x10\x00\x00\x00$\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x11\x00\
x00\x00__index_level_0__\x00\x00\x00\\\xff\xff\xff\x00\x00\x00\x01@\x00\x00\x00\x98\xff\xff\xff\x00\x00\x01\x02\x10\x00\x00\x00$
\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00\x00_partitions\x00\x00\x00\x06\x00\x08\x00\x04\x00\x06\x00\x00\x00@\x00
\x00\x00\xd0\xff\xff\xff\x00\x00\x01\x03\x10\x00\x00\x00\x1c\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00value\x0
0\x06\x00\x08\x00\x06\x00\x06\x00\x00\x00\x00\x00\x02\x00\x10\x00\x14\x00\x08\x00\x06\x00\x07\x00\x0c\x00\x00\x00\x10\x00\x10\x0
0\x00\x00\x00\x00\x01\x02\x10\x00\x00\x00\x1c\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00id\x00\x00\x08\x00\x0c\
x00\x08\x00\x07\x00\x08\x00\x00\x00\x00\x00\x00\x01@\x00\x00\x00\xff\xff\xff\xff\x18\x01\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00
\x0c\x00\x16\x00\x06\x00\x05\x00\x08\x00\x0c\x00\x0c\x00\x00\x00\x00\x03\x04\x00\x18\x00\x00\x00@1\xd4\x00\x00\x00\x00\x00\x00\x
00\n\x00\x18\x00\x0c\x00\x04\x00\x08\x00\n\x00\x00\x00\x9c\x00\x00\x00\x10\x00\x00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x
00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00P\x0c5\x00
\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00\x
a0\x18j\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa0\x18j\x00\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00\xf0$\x9f\x00\
x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0$\x9f\x00\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x
00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x
00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x
00\x00M\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00P\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00
\x0f\x00\x00\x00\x00\x00\x00\x00\x0f\x00\x00\x00\x00\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00,\x00\x00\x00\x00\x00\x00\x00E\x
00\x00\x00\x00\x00\x00\x00?\x00\x00\x00\x00\x00\x00\x00I\x00\x00\x00\x00\x00\x00\x00L\x00\x00\x00\x00\x00\x00\x00I\x00\x00\x00\x
00\x00\x00\x00M\x00\x00\x00\x00\x00\x00\x00L\x00\x00\x00\x00\x00\x00\x00L\x00\x00\x00\x00\x00\x00\x00_\x00
and then 15 MiB more of similar data.
Anything else we need to know?:
Adding
env_vars={
"DASK_DISTRIBUTED__LOGGING__DISTRIBUTED": "info",
},
to GCPCluster helps, so it seems to be a logging issue.
Environment:
- Dask version: 2024.1.0
- Python version: 3.10.13.final.0 on client (default on the remote machine, i.e. 3.10.12.final.
- Operating System: MacOS on client (default on the remote machine, i.e. Ubuntu minimal 18.04).
- Install method (conda, pip, source): Poetry on client (default on the remote machine, i.e. official Docker image).
Describe the issue:
The issue is originally described here: dask/dask-cloudprovider#423. But it does not seem like this is GCP specific issue, because the problematic message is from
distributed.core, so also reporting here. Although, I did not have a chance to test it with other providers.When logging level is set to DEBUG, which is the default one (?), then shuffling with
shuffle_method="p2p"leads to workers silently dying.Minimal Complete Verifiable Example:
Leads to a log message on the worker that looks something like
and then 15 MiB more of similar data.
Anything else we need to know?:
Adding
to
GCPClusterhelps, so it seems to be a logging issue.Environment: