Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ async def test_shuffling(c, s, a, b):
ss = Shuffling(s)

df = dask.datasets.timeseries()
df["name"] = df["name"].astype("string[python]")
df2 = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist()
start = time()
while not ss.source.data["disk_read"]:
Expand Down
21 changes: 21 additions & 0 deletions distributed/shuffle/_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,30 @@
from typing import TYPE_CHECKING, BinaryIO

if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa


def check_dtype_support(meta_input: pd.DataFrame) -> None:
import pandas as pd

for name in meta_input:
column = meta_input[name]
# FIXME: PyArrow does not support complex numbers: https://issues.apache.org/jira/browse/ARROW-638
if pd.api.types.is_complex_dtype(column):
raise TypeError(
f"p2p does not support data of type '{column.dtype}' found in column '{name}'."
)
# FIXME: Serializing custom objects to PyArrow is not supported in P2P shuffling
if pd.api.types.is_object_dtype(column):
raise TypeError(
f"p2p does not support custom objects found in column '{name}'."
)
# FIXME: PyArrow does not support sparse data: https://issues.apache.org/jira/browse/ARROW-8679
if pd.api.types.is_sparse(column):
raise TypeError("p2p does not support sparse data found in column '{name}'")
Comment on lines +17 to +28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of me likes the explicitness of these checks, but I wonder if they've got full coverage. For example, what happens if someone if using their own custom extension array dtype? My guess is this check wouldn't raise an error and shuffling would fail (though I've not confirmed this).

Maybe a there's a simple try/except we can do here instead to check that shuffling doesn't work? Would not being able to roundtrip through the shuffle-specific serialization be a sufficient check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You bring up a good point. Let me check what I need to do to trigger reliably trigger an error that we could utilize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if it's not straightforward to come up with such a check, we can totally include what you have here and follow-up as needed. The current set of changes are already an improvement over the current main branch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just try to roundtrip meta? That would not be such a nice message but it would fail early

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just try to roundtrip meta? That would not be such a nice message but it would fail early

I've tried this before going out on holidays and it detected some issues but not all. IIRC, the main problem was object requiring actual data. We could combine explicit checks with the roundtrip though. That would give us nice error messages for some stuff and a fallback for anything we haven't explicitly checked.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also df._meta_nonempty for cases like this. Might not catch super funky stuff but that's ok from my POV. After all, the shuffle will fail either way it's just about when it fails, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look at df._meta_nonempty. It's both about failing early and providing useful feedback to the user.



def dump_shards(shards: list[bytes], file: BinaryIO) -> None:
"""
Write multiple shard tables to the file
Expand Down
9 changes: 3 additions & 6 deletions distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from dask.highlevelgraph import HighLevelGraph
from dask.layers import SimpleShuffleLayer

from distributed.shuffle._arrow import check_dtype_support

logger = logging.getLogger("distributed.shuffle")
if TYPE_CHECKING:
import pandas as pd
Expand Down Expand Up @@ -94,6 +96,7 @@ def rearrange_by_column_p2p(
) -> DataFrame:
from dask.dataframe import DataFrame

check_dtype_support(df._meta)
npartitions = npartitions or df.npartitions
token = tokenize(df, column, npartitions)

Expand All @@ -103,12 +106,6 @@ def rearrange_by_column_p2p(
raise TypeError(
f"p2p requires all column names to be str, found: {unsupported}",
)
for c, dt in empty.dtypes.items():
if dt == object:
empty[c] = empty[c].astype(
"string"
) # TODO: we fail at non-string object dtypes
empty[column] = empty[column].astype("int64") # TODO: this shouldn't be necesssary

name = f"shuffle-p2p-{token}"
layer = P2PShuffleLayer(
Expand Down
33 changes: 33 additions & 0 deletions distributed/shuffle/tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,38 @@ def test_basic(client):
# ^ NOTE: this works because `assert_eq` sorts the rows before comparing


@pytest.mark.parametrize("dtype", ["csingle", "cdouble", "clongdouble"])
def test_raise_on_complex_numbers(dtype):
df = dd.from_pandas(
pd.DataFrame({"x": pd.array(range(10), dtype=dtype)}), npartitions=5
)
with pytest.raises(
TypeError, match=f"p2p does not support data of type '{df.x.dtype}'"
):
df.shuffle("x", shuffle="p2p")


def test_raise_on_custom_objects():
class Stub:
def __init__(self, value: int) -> None:
self.value = value

df = dd.from_pandas(
pd.DataFrame({"x": pd.array([Stub(i) for i in range(10)], dtype="object")}),
npartitions=5,
)
with pytest.raises(TypeError, match="p2p does not support custom objects"):
df.shuffle("x", shuffle="p2p")


def test_raise_on_sparse_data():
df = dd.from_pandas(
pd.DataFrame({"x": pd.array(range(10), dtype="Sparse[float64]")}), npartitions=5
)
with pytest.raises(TypeError, match="p2p does not support sparse data"):
df.shuffle("x", shuffle="p2p")


def test_raise_on_non_string_column_name():
df = dd.from_pandas(pd.DataFrame({"a": range(10), 1: range(10)}), npartitions=5)
with pytest.raises(TypeError, match="p2p requires all column names to be str"):
Expand All @@ -43,6 +75,7 @@ def test_does_not_raise_on_stringified_numeric_column_name():
@gen_cluster([("", 2)] * 4, client=True)
async def test_basic_state(c, s, *workers):
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
df["name"] = df["name"].astype("string[python]")
shuffled = df.shuffle("id", shuffle="p2p")

exts = [w.extensions["shuffle"] for w in workers]
Expand Down