Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
from distributed.diagnostics.plugin import WorkerPlugin
from distributed.metrics import time
from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler
from distributed.shuffle import check_minimal_arrow_version
from distributed.sizeof import sizeof
from distributed.utils import (
NoOpAwaitable,
Expand Down Expand Up @@ -3258,11 +3259,17 @@ async def test_cancel_clears_processing(c, s, *workers):


def test_default_get(loop_in_thread):
has_pyarrow = False
try:
check_minimal_arrow_version()
has_pyarrow = True
except RuntimeError:
pass
loop = loop_in_thread
with cluster() as (s, [a, b]):
pre_get = dask.base.get_scheduler()
# These may change in the future but the selection below shoul dnot
distributed_default = "tasks"
# These may change in the future but the selection below should not
distributed_default = "p2p" if has_pyarrow else "tasks"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is we'll also want to use check_minimal_arrow_version() to make sure we account for pyarrow<7 being installed

local_default = "disk"
assert get_default_shuffle_algorithm() == local_default
with Client(s["address"], set_as_default=True, loop=loop) as c:
Expand Down
14 changes: 11 additions & 3 deletions distributed/tests/test_dask_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
np = pytest.importorskip("numpy")
pd = pytest.importorskip("pandas")

from packaging.version import parse as parse_version
from pandas.testing import assert_frame_equal, assert_index_equal, assert_series_equal

import dask
Expand Down Expand Up @@ -109,6 +110,13 @@ async def test_dask_array_collections(c, s, a, b):
np.testing.assert_equal(o_local, o_remote)


@pytest.mark.skipif(
(
parse_version(dask.__version__) < parse_version("2023.2.2")
and parse_version(dask.__version__) >= parse_version("2023.2.1")
),
reason="https://github.com/dask/dask/pull/10005",
)
@gen_cluster(client=True)
async def test_bag_groupby_tasks_default(c, s, a, b):
Comment on lines +113 to 121
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to skip this test to unblock our CI. dask/dask#10005 will introduce the same test in dask/dask, i.e. we could even remove it from distributed. keeping it here for early failures is still nice since the test is very cheap

b = db.range(100, npartitions=10)
Expand Down Expand Up @@ -174,12 +182,12 @@ def test_dataframe_groupby_tasks(client):
for ind in [lambda x: "A", lambda x: x.A]:
a = df.groupby(ind(df)).apply(len)
b = ddf.groupby(ind(ddf)).apply(len, meta=(None, int))
assert_equal(a, b.compute(scheduler="sync").sort_index())
assert_equal(a, b.compute().sort_index())
assert not any("partd" in k[0] for k in b.dask)

a = df.groupby(ind(df)).B.apply(len)
b = ddf.groupby(ind(ddf)).B.apply(len, meta=("B", int))
assert_equal(a, b.compute(scheduler="sync").sort_index())
assert_equal(a, b.compute().sort_index())
assert not any("partd" in k[0] for k in b.dask)

with pytest.raises((NotImplementedError, ValueError)):
Expand All @@ -188,7 +196,7 @@ def test_dataframe_groupby_tasks(client):
a = df.groupby(["A", "B"]).apply(len)
b = ddf.groupby(["A", "B"]).apply(len, meta=(None, int))

assert_equal(a, b.compute(scheduler="sync").sort_index())
assert_equal(a, b.compute().sort_index())


@gen_cluster(client=True)
Expand Down