Describe the issue:
Dask silently fails to shuffle on a particular column using "p2p" shuffle method on GCP (but not sure if GCP specific issue). I.e.
ddf = ddf.shuffle(on=col_name, shuffle_method="p2p")
or
ddf = ddf.shuffle(on=col_name)
shows the described behaviour.
Minimal Complete Verifiable Example:
import dask
import dask.dataframe as dd
from dask.distributed import Client
from dask_cloudprovider.gcp import GCPCluster
# Generate a data frame with random integers using delayed execution
@dask.delayed
def generate_df(n_rows):
from numpy.random import randint
from numpy.random import rand
import pandas as pd
return pd.DataFrame({"id": randint(1, 100, n_rows), "value": rand(n_rows)})
with GCPCluster(zone="europe-west4-a", n_workers=2) as cluster:
with Client(cluster) as client:
# Generate a data frame with 2 columns
ddf = dd.from_delayed([generate_df(1_000_000) for _ in range(8)])
# =====================================================================
# Make sure everything works (except for shuffling using p2p)
ddf = ddf.persist()
# Preview
print(ddf.head())
# id value
# 0 26 0.039676
# 1 18 0.217264
# 2 98 0.192971
# 3 11 0.140765
# 4 52 0.486008
# Count rows
n_rows = ddf.shape[0].compute()
print("Original row count:", n_rows)
# Original row count: 8000000
# Repartitioning works (and it also makes sure that the generated
# data is actually distributed across the workers)
ddf = ddf.repartition(npartitions=16)
n_rows = ddf.shape[0].compute()
print("Row count after repartitioning: ", n_rows)
# Row count after repartitioning: 8000000
# Individual partitions are also successfully computed
n_rows = ddf.repartition(npartitions=16).partitions[3].shape[0].compute()
print("Row count of partition 3: ", n_rows)
# Row count of partition 3: 500000
# "disk" shuffle works: data is lost, but this is expected using 2 workers
n_rows = ddf.shuffle(on="id", shuffle_method="disk").shape[0].compute()
print("Row count after disk shuffle: ", n_rows)
# Row count after disk shuffle: 2938772
# "tasks" shuffle works
n_rows = ddf.shuffle(on="id", shuffle_method="tasks").shape[0].compute()
print("Row count after tasks shuffle: ", n_rows)
# Row count after tasks shuffle: 8000000
# =====================================================================
# "p2p" fails.
n_rows = ddf.shuffle(on="id", shuffle_method="p2p").shape[0].compute()
# This does not throw an error, but workers hang, i.e.:
# - no heartbeat
# - no updates on the dashboard
# - no new logs
# and then silently die
print("Row count after p2p shuffle: ", n_rows)
Anything else we need to know?:
- Fails with a single worker too.
- Using "disk" works, but data is lost if there is more than 1 worker (which is expected).
- Using "tasks" works.
- When using
LocalCluster with 1 or more workers, all methods, including "p2p", work.
- Worker Docker logs (both worker logs look very similar) seem to contain a huge dump of binary data. This is a sample extract before failing:
2024-01-15 12:33:03,474 - distributed.worker - DEBUG - Request 1 keys from tls://10.164.15.224:44221
2024-01-15 12:33:03,482 - distributed.comm.core - DEBUG - Establishing connection to 10.164.15.224:44221
2024-01-15 12:33:03,509 - distributed.comm.tcp - DEBUG - Setting TCP keepalive: nprobes=10, idle=10, interval=2
2024-01-15 12:33:03,510 - distributed.comm.tcp - DEBUG - Setting TCP user timeout: 30000 ms
2024-01-15 12:33:03,510 - distributed.comm.tcp - DEBUG - TLS connection with 'tls://10.164.15.224:44221': protocol=TLSv1.3, cipher=TLS_AES_256_GCM_SHA384, bits=256
2024-01-15 12:33:03,538 - distributed.comm.core - DEBUG - Connection to 10.164.15.224:44221 established
2024-01-15 12:33:03,542 - distributed.core - DEBUG - Message from 'tls://10.164.15.225:33604': {'op': 'shuffle_receive', 'data': [(1, b'\xff\xff\xff\xff\xa0\x04\x00\x00\x10\x00\x00\x00\x00\x00\n\x00\x0e\x00\x06\x00\x05\x00\x08\x00\n\x00\x00\x00\x00\x01\x04\x00\x10\x00\x00\x00\x00\x00\n\x00\x0c\x00\x00\x00\x04\x00\x08\x00\n\x00\x00\x00x\x03\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x0
c\x00\x00\x00\x08\x00\x0c\x00\x04\x00\x08\x00\x08\x00\x00\x00\x08\x00\x00\x00\x10\x00\x00\x00\x06\x00\x00\x00pandas\x00\x00C\x03
\x00\x00{"index_columns": ["__index_level_0__"], "column_indexes": [{"name": null, "field_name": null, "pandas_type": "unicode",
"numpy_type": "object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "id", "field_name": "id", "pandas_type": "int
64", "numpy_type": "int64", "metadata": null}, {"name": "value", "field_name": "value", "pandas_type": "float64", "numpy_type":
"float64", "metadata": null}, {"name": "_partitions", "field_name": "_partitions", "pandas_type": "uint64", "numpy_type": "uint6
4", "metadata": null}, {"name": "_worker", "field_name": "_worker", "pandas_type": "int8", "numpy_type": "int8", "metadata": nul
l}, {"name": null, "field_name": "__index_level_0__", "pandas_type": "int64", "numpy_type": "int64", "metadata": null}], "creato
r": {"library": "pyarrow", "version": "14.0.2"}, "pandas_version": "2.1.4"}\x00\x04\x00\x00\x00\xc4\x00\x00\x00\x80\x00\x00\x00D
\x00\x00\x00\x04\x00\x00\x00\\\xff\xff\xff\x00\x00\x01\x02\x10\x00\x00\x00$\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x11\x00\
x00\x00__index_level_0__\x00\x00\x00\\\xff\xff\xff\x00\x00\x00\x01@\x00\x00\x00\x98\xff\xff\xff\x00\x00\x01\x02\x10\x00\x00\x00$
\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00\x00_partitions\x00\x00\x00\x06\x00\x08\x00\x04\x00\x06\x00\x00\x00@\x00
\x00\x00\xd0\xff\xff\xff\x00\x00\x01\x03\x10\x00\x00\x00\x1c\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00value\x0
0\x06\x00\x08\x00\x06\x00\x06\x00\x00\x00\x00\x00\x02\x00\x10\x00\x14\x00\x08\x00\x06\x00\x07\x00\x0c\x00\x00\x00\x10\x00\x10\x0
0\x00\x00\x00\x00\x01\x02\x10\x00\x00\x00\x1c\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00id\x00\x00\x08\x00\x0c\
x00\x08\x00\x07\x00\x08\x00\x00\x00\x00\x00\x00\x01@\x00\x00\x00\xff\xff\xff\xff\x18\x01\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00
\x0c\x00\x16\x00\x06\x00\x05\x00\x08\x00\x0c\x00\x0c\x00\x00\x00\x00\x03\x04\x00\x18\x00\x00\x00@1\xd4\x00\x00\x00\x00\x00\x00\x
00\n\x00\x18\x00\x0c\x00\x04\x00\x08\x00\n\x00\x00\x00\x9c\x00\x00\x00\x10\x00\x00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x
00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00P\x0c5\x00
\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00\x
a0\x18j\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa0\x18j\x00\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00\xf0$\x9f\x00\
x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0$\x9f\x00\x00\x00\x00\x00P\x0c5\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x
00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x
00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x8a\xa1\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x
00\x00M\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00P\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00
\x0f\x00\x00\x00\x00\x00\x00\x00\x0f\x00\x00\x00\x00\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00,\x00\x00\x00\x00\x00\x00\x00E\x
00\x00\x00\x00\x00\x00\x00?\x00\x00\x00\x00\x00\x00\x00I\x00\x00\x00\x00\x00\x00\x00L\x00\x00\x00\x00\x00\x00\x00I\x00\x00\x00\x
00\x00\x00\x00M\x00\x00\x00\x00\x00\x00\x00L\x00\x00\x00\x00\x00\x00\x00L\x00\x00\x00\x00\x00\x00\x00_\x00
and then 15 more MiB of similar data.
Environment:
- Dask version: 2024.1.0
- Python version: 3.10.13.final.0 on client (default on the remote machine, i.e. 3.10.12.final.0)
- Operating System: MacOS on client (default on the remote machine, i.e. Ubuntu minimal 18.04).
- Install method (conda, pip, source): Poetry on client (default on the remote machine, i.e. official Docker image).
Describe the issue:
Dask silently fails to shuffle on a particular column using "p2p" shuffle method on GCP (but not sure if GCP specific issue). I.e.
or
shows the described behaviour.
Minimal Complete Verifiable Example:
Anything else we need to know?:
LocalClusterwith 1 or more workers, all methods, including "p2p", work.and then 15 more MiB of similar data.
Environment: