diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 90b999bcc41..f9dbb5e7caf 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -81,6 +81,7 @@ from distributed.diagnostics.plugin import WorkerPlugin from distributed.metrics import time from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler +from distributed.shuffle import check_minimal_arrow_version from distributed.sizeof import sizeof from distributed.utils import ( NoOpAwaitable, @@ -3258,11 +3259,17 @@ async def test_cancel_clears_processing(c, s, *workers): def test_default_get(loop_in_thread): + has_pyarrow = False + try: + check_minimal_arrow_version() + has_pyarrow = True + except RuntimeError: + pass loop = loop_in_thread with cluster() as (s, [a, b]): pre_get = dask.base.get_scheduler() - # These may change in the future but the selection below shoul dnot - distributed_default = "tasks" + # These may change in the future but the selection below should not + distributed_default = "p2p" if has_pyarrow else "tasks" local_default = "disk" assert get_default_shuffle_algorithm() == local_default with Client(s["address"], set_as_default=True, loop=loop) as c: diff --git a/distributed/tests/test_dask_collections.py b/distributed/tests/test_dask_collections.py index 214db022105..12b9417d9ac 100644 --- a/distributed/tests/test_dask_collections.py +++ b/distributed/tests/test_dask_collections.py @@ -5,6 +5,7 @@ np = pytest.importorskip("numpy") pd = pytest.importorskip("pandas") +from packaging.version import parse as parse_version from pandas.testing import assert_frame_equal, assert_index_equal, assert_series_equal import dask @@ -109,6 +110,13 @@ async def test_dask_array_collections(c, s, a, b): np.testing.assert_equal(o_local, o_remote) +@pytest.mark.skipif( + ( + parse_version(dask.__version__) < parse_version("2023.2.2") + and parse_version(dask.__version__) >= parse_version("2023.2.1") + ), + reason="https://github.com/dask/dask/pull/10005", +) @gen_cluster(client=True) async def test_bag_groupby_tasks_default(c, s, a, b): b = db.range(100, npartitions=10) @@ -174,12 +182,12 @@ def test_dataframe_groupby_tasks(client): for ind in [lambda x: "A", lambda x: x.A]: a = df.groupby(ind(df)).apply(len) b = ddf.groupby(ind(ddf)).apply(len, meta=(None, int)) - assert_equal(a, b.compute(scheduler="sync").sort_index()) + assert_equal(a, b.compute().sort_index()) assert not any("partd" in k[0] for k in b.dask) a = df.groupby(ind(df)).B.apply(len) b = ddf.groupby(ind(ddf)).B.apply(len, meta=("B", int)) - assert_equal(a, b.compute(scheduler="sync").sort_index()) + assert_equal(a, b.compute().sort_index()) assert not any("partd" in k[0] for k in b.dask) with pytest.raises((NotImplementedError, ValueError)): @@ -188,7 +196,7 @@ def test_dataframe_groupby_tasks(client): a = df.groupby(["A", "B"]).apply(len) b = ddf.groupby(["A", "B"]).apply(len, meta=(None, int)) - assert_equal(a, b.compute(scheduler="sync").sort_index()) + assert_equal(a, b.compute().sort_index()) @gen_cluster(client=True)