From b3cf8d078a08ec2a45fdc88697907b6a3db0b252 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 May 2023 18:15:34 +0200 Subject: [PATCH 01/12] Rename --- distributed/shuffle/_worker_extension.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/shuffle/_worker_extension.py b/distributed/shuffle/_worker_extension.py index edf70df9562..cff184b225e 100644 --- a/distributed/shuffle/_worker_extension.py +++ b/distributed/shuffle/_worker_extension.py @@ -302,7 +302,7 @@ def __init__( memory_limiter_disk: ResourceLimiter, memory_limiter_comms: ResourceLimiter, ): - from dask.array.rechunk import _old_to_new + from dask.array.rechunk import old_to_new super().__init__( id=id, @@ -324,7 +324,7 @@ def __init__( self.partitions_of = dict(partitions_of) self.worker_for = worker_for self._slicing = rechunk_slicing(old, new) - self._old_to_new = _old_to_new(old, new) + self._old_to_new = old_to_new(old, new) async def _receive(self, data: list[tuple[ArrayRechunkShardID, bytes]]) -> None: self.raise_if_closed() From f5226b34ce7f62c1ed1f61aab23b3bef9b0b0297 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 24 May 2023 21:53:29 +0200 Subject: [PATCH 02/12] [skip-caching] From a10e1eb95e73694347451f1e8dbb584fd14a88a7 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 24 May 2023 16:02:27 -0500 Subject: [PATCH 03/12] [skip-caching] From 62a896aa96b02cdffdf90a55f0c982261c2e5137 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 24 May 2023 16:28:11 -0500 Subject: [PATCH 04/12] Try invalidating cache [skip-caching] --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5c030e105f1..1c95c4a7c58 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -63,5 +63,5 @@ repos: - pytest - tornado - pyarrow - - git+https://github.com/dask/dask + - git+https://github.com/dask/dask.git - git+https://github.com/dask/zict From b6503efd725a37101f437d86bc8eeeeef7a5dba8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 25 May 2023 09:40:47 +0200 Subject: [PATCH 05/12] Align tests with dask/dask#10027 --- distributed/shuffle/tests/test_rechunk.py | 36 ++++++++++++++++++----- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/distributed/shuffle/tests/test_rechunk.py b/distributed/shuffle/tests/test_rechunk.py index 609b7a04711..f2b645ea452 100644 --- a/distributed/shuffle/tests/test_rechunk.py +++ b/distributed/shuffle/tests/test_rechunk.py @@ -463,6 +463,28 @@ async def test_rechunk_same(c, s, *ws): assert x is y +def test_rechunk_same_fully_unknown(): + dd = pytest.importorskip("dask.dataframe") + x = da.ones(shape=(10, 10), chunks=(5, 10)) + y = dd.from_array(x).values + new_chunks = ((np.nan, np.nan), (10,)) + assert y.chunks == new_chunks + result = y.rechunk(new_chunks) + assert y is result + + +def test_rechunk_same_fully_unknown_floats(): + """Similar to test_rechunk_same_fully_unknown but testing the behavior if + ``float("nan")`` is used instead of the recommended ``np.nan`` + """ + dd = pytest.importorskip("dask.dataframe") + x = da.ones(shape=(10, 10), chunks=(5, 10)) + y = dd.from_array(x).values + new_chunks = ((float("nan"), float("nan")), (10,)) + result = y.rechunk(new_chunks) + assert y is result + + @gen_cluster(client=True) async def test_rechunk_with_zero_placeholders(c, s, *ws): """ @@ -584,7 +606,7 @@ async def test_rechunk_unknown_explicit(c, s, *ws): dd = pytest.importorskip("dask.dataframe") x = da.ones(shape=(10, 10), chunks=(5, 2)) y = dd.from_array(x).values - result = y.rechunk(((float("nan"), float("nan")), (5, 5)), method="p2p") + result = y.rechunk(((np.nan, np.nan), (5, 5)), method="p2p") expected = x.rechunk((None, (5, 5)), method="p2p") assert_chunks_match(result.chunks, expected.chunks) assert_eq(await c.compute(result), await c.compute(expected)) @@ -880,8 +902,8 @@ def test_rechunk_slicing_nan(): -------- dask.array.tests.test_rechunk.test_intersect_nan """ - old_chunks = ((float("nan"), float("nan")), (8,)) - new_chunks = ((float("nan"), float("nan")), (4, 4)) + old_chunks = ((np.nan, np.nan), (8,)) + new_chunks = ((np.nan, np.nan), (4, 4)) result = rechunk_slicing(old_chunks, new_chunks) expected = { (0, 0): [ @@ -908,8 +930,8 @@ def test_rechunk_slicing_nan_single(): -------- dask.array.tests.test_rechunk.test_intersect_nan_single """ - old_chunks = ((float("nan"),), (10,)) - new_chunks = ((float("nan"),), (5, 5)) + old_chunks = ((np.nan,), (10,)) + new_chunks = ((np.nan,), (5, 5)) result = rechunk_slicing(old_chunks, new_chunks) expected = { @@ -927,8 +949,8 @@ def test_rechunk_slicing_nan_long(): -------- dask.array.tests.test_rechunk.test_intersect_nan_long """ - old_chunks = (tuple([float("nan")] * 4), (10,)) - new_chunks = (tuple([float("nan")] * 4), (5, 5)) + old_chunks = (tuple([np.nan] * 4), (10,)) + new_chunks = (tuple([np.nan] * 4), (5, 5)) result = rechunk_slicing(old_chunks, new_chunks) expected = { (0, 0): [ From 85170e06d32039ac18b56ae8883df4601b699033 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 25 May 2023 09:45:43 +0200 Subject: [PATCH 06/12] Align tests with dask/dask#10003 --- distributed/shuffle/tests/test_rechunk.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/distributed/shuffle/tests/test_rechunk.py b/distributed/shuffle/tests/test_rechunk.py index f2b645ea452..438cbe317f5 100644 --- a/distributed/shuffle/tests/test_rechunk.py +++ b/distributed/shuffle/tests/test_rechunk.py @@ -83,7 +83,9 @@ async def test_lowlevel_rechunk( ind_chunks = [[(i, x) for i, x in enumerate(dim)] for dim in old] ind_chunks = [list(zip(x, y)) for x, y in product(*ind_chunks)] - old_chunks = {idx: np.random.random(chunk) for idx, chunk in ind_chunks} + old_chunks = { + idx: np.random.default_rng().random(chunk) for idx, chunk in ind_chunks + } workers = list("abcdefghijklmn")[:n_workers] @@ -161,7 +163,7 @@ async def test_rechunk_configuration(c, s, *ws, config_value, keyword): -------- dask.array.tests.test_rechunk.test_rechunk_1d """ - a = np.random.uniform(0, 1, 30) + a = np.random.default_rng().uniform(0, 1, 30) x = da.from_array(a, chunks=((10,) * 3,)) new = ((6,) * 5,) config = {"array.rechunk.method": config_value} if config_value is not None else {} @@ -185,7 +187,7 @@ async def test_rechunk_2d(c, s, *ws): -------- dask.array.tests.test_rechunk.test_rechunk_2d """ - a = np.random.uniform(0, 1, 300).reshape((10, 30)) + a = np.random.default_rng().uniform(0, 1, 300).reshape((10, 30)) x = da.from_array(a, chunks=((1, 2, 3, 4), (5,) * 6)) new = ((5, 5), (15,) * 2) x2 = rechunk(x, chunks=new, method="p2p") @@ -202,7 +204,7 @@ async def test_rechunk_4d(c, s, *ws): dask.array.tests.test_rechunk.test_rechunk_4d """ old = ((5, 5),) * 4 - a = np.random.uniform(0, 1, 10000).reshape((10,) * 4) + a = np.random.default_rng().uniform(0, 1, 10000).reshape((10,) * 4) x = da.from_array(a, chunks=old) new = ( (10,), @@ -225,7 +227,7 @@ async def test_rechunk_with_single_output_chunk_raises(c, s, *ws): dask.array.tests.test_rechunk.test_rechunk_4d """ old = ((5, 5),) * 4 - a = np.random.uniform(0, 1, 10000).reshape((10,) * 4) + a = np.default_rng().uniform(0, 1, 10000).reshape((10,) * 4) x = da.from_array(a, chunks=old) new = ((10,),) * 4 x2 = rechunk(x, chunks=new, method="p2p") @@ -244,7 +246,7 @@ async def test_rechunk_expand(c, s, *ws): -------- dask.array.tests.test_rechunk.test_rechunk_expand """ - a = np.random.uniform(0, 1, 100).reshape((10, 10)) + a = np.random.default_rng().uniform(0, 1, 100).reshape((10, 10)) x = da.from_array(a, chunks=(5, 5)) y = x.rechunk(chunks=((3, 3, 3, 1), (3, 3, 3, 1)), method="p2p") assert np.all(await c.compute(y) == a) @@ -258,7 +260,7 @@ async def test_rechunk_expand2(c, s, *ws): dask.array.tests.test_rechunk.test_rechunk_expand2 """ (a, b) = (3, 2) - orig = np.random.uniform(0, 1, a**b).reshape((a,) * b) + orig = np.random.default_rng().uniform(0, 1, a**b).reshape((a,) * b) for off, off2 in product(range(1, a - 1), range(1, a - 1)): old = ((a - off, off),) * b x = da.from_array(orig, chunks=old) @@ -280,7 +282,7 @@ async def test_rechunk_method(c, s, *ws): """ old = ((5, 2, 3),) * 4 new = ((3, 3, 3, 1),) * 4 - a = np.random.uniform(0, 1, 10000).reshape((10,) * 4) + a = np.random.default_rng().uniform(0, 1, 10000).reshape((10,) * 4) x = da.from_array(a, chunks=old) x2 = x.rechunk(chunks=new, method="p2p") assert x2.chunks == new @@ -298,7 +300,7 @@ async def test_rechunk_blockshape(c, s, *ws): new_shape, new_chunks = (10, 10), (4, 3) new_blockdims = normalize_chunks(new_chunks, new_shape) old_chunks = ((4, 4, 2), (3, 3, 3, 1)) - a = np.random.uniform(0, 1, 100).reshape((10, 10)) + a = np.random.default_rng().uniform(0, 1, 100).reshape((10, 10)) x = da.from_array(a, chunks=old_chunks) check1 = rechunk(x, chunks=new_chunks, method="p2p") assert check1.chunks == new_blockdims @@ -535,7 +537,7 @@ async def test_rechunk_unknown_from_pandas(c, s, *ws): dd = pytest.importorskip("dask.dataframe") pd = pytest.importorskip("pandas") - arr = np.random.randn(50, 10) + arr = np.default_rng().standard_normal((50, 10)) x = dd.from_pandas(pd.DataFrame(arr), 2).values result = x.rechunk((None, (5, 5)), method="p2p") assert np.isnan(x.chunks[0]).all() From 9728331cb3c356bbb6117b60bd3acb9aa9c84dce Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 25 May 2023 10:35:08 +0200 Subject: [PATCH 07/12] Align tests with dask/dask#10157 --- distributed/shuffle/tests/test_rechunk.py | 114 ++++++++++++++++++---- 1 file changed, 95 insertions(+), 19 deletions(-) diff --git a/distributed/shuffle/tests/test_rechunk.py b/distributed/shuffle/tests/test_rechunk.py index 438cbe317f5..523ac3987ef 100644 --- a/distributed/shuffle/tests/test_rechunk.py +++ b/distributed/shuffle/tests/test_rechunk.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import math import random import warnings @@ -227,7 +228,7 @@ async def test_rechunk_with_single_output_chunk_raises(c, s, *ws): dask.array.tests.test_rechunk.test_rechunk_4d """ old = ((5, 5),) * 4 - a = np.default_rng().uniform(0, 1, 10000).reshape((10,) * 4) + a = np.random.default_rng().uniform(0, 1, 10000).reshape((10,) * 4) x = da.from_array(a, chunks=old) new = ((10,),) * 4 x2 = rechunk(x, chunks=new, method="p2p") @@ -465,28 +466,56 @@ async def test_rechunk_same(c, s, *ws): assert x is y -def test_rechunk_same_fully_unknown(): +@gen_cluster(client=True) +async def test_rechunk_same_fully_unknown(c, s, *ws): + """ + See Also + -------- + dask.array.tests.test_rechunk.test_rechunk_same_fully_unknown + """ dd = pytest.importorskip("dask.dataframe") x = da.ones(shape=(10, 10), chunks=(5, 10)) y = dd.from_array(x).values new_chunks = ((np.nan, np.nan), (10,)) assert y.chunks == new_chunks - result = y.rechunk(new_chunks) + result = y.rechunk(new_chunks, method="p2p") assert y is result -def test_rechunk_same_fully_unknown_floats(): +@gen_cluster(client=True) +async def test_rechunk_same_fully_unknown_floats(c, s, *ws): """Similar to test_rechunk_same_fully_unknown but testing the behavior if ``float("nan")`` is used instead of the recommended ``np.nan`` + + See Also + -------- + dask.array.tests.test_rechunk.test_rechunk_same_fully_unknown_floats """ dd = pytest.importorskip("dask.dataframe") x = da.ones(shape=(10, 10), chunks=(5, 10)) y = dd.from_array(x).values new_chunks = ((float("nan"), float("nan")), (10,)) - result = y.rechunk(new_chunks) + result = y.rechunk(new_chunks, method="p2p") assert y is result +@gen_cluster(client=True) +async def test_rechunk_same_partially_unknown(c, s, *ws): + """ + See Also + -------- + dask.array.tests.test_rechunk.test_rechunk_same_partially_unknown + """ + dd = pytest.importorskip("dask.dataframe") + x = da.ones(shape=(10, 10), chunks=(5, 10)) + y = dd.from_array(x).values + z = da.concatenate([x, y]) + new_chunks = ((5, 5, np.nan, np.nan), (10,)) + assert z.chunks == new_chunks + result = z.rechunk(new_chunks, method="p2p") + assert z is result + + @gen_cluster(client=True) async def test_rechunk_with_zero_placeholders(c, s, *ws): """ @@ -537,7 +566,7 @@ async def test_rechunk_unknown_from_pandas(c, s, *ws): dd = pytest.importorskip("dask.dataframe") pd = pytest.importorskip("pandas") - arr = np.default_rng().standard_normal((50, 10)) + arr = np.random.default_rng().standard_normal((50, 10)) x = dd.from_pandas(pd.DataFrame(arr), 2).values result = x.rechunk((None, (5, 5)), method="p2p") assert np.isnan(x.chunks[0]).all() @@ -583,11 +612,11 @@ async def test_rechunk_unknown_from_array(c, s, *ws): ], ) @gen_cluster(client=True) -async def test_rechunk_unknown(c, s, *ws, x, chunks): +async def test_rechunk_with_fully_unknown_dimension(c, s, *ws, x, chunks): """ See Also -------- - dask.array.tests.test_rechunk.test_rechunk_unknown + dask.array.tests.test_rechunk.test_rechunk_with_fully_unknown_dimension """ dd = pytest.importorskip("dask.dataframe") y = dd.from_array(x).values @@ -598,28 +627,67 @@ async def test_rechunk_unknown(c, s, *ws, x, chunks): assert_eq(await c.compute(result), await c.compute(expected)) +@pytest.mark.parametrize( + "x, chunks", + [ + (da.ones(shape=(50, 10), chunks=(25, 10)), (None, 5)), + (da.ones(shape=(50, 10), chunks=(25, 10)), {1: 5}), + (da.ones(shape=(50, 10), chunks=(25, 10)), (None, (5, 5))), + (da.ones(shape=(1000, 10), chunks=(5, 10)), (None, 5)), + (da.ones(shape=(1000, 10), chunks=(5, 10)), {1: 5}), + (da.ones(shape=(1000, 10), chunks=(5, 10)), (None, (5, 5))), + (da.ones(shape=(10, 10), chunks=(10, 10)), (None, 5)), + (da.ones(shape=(10, 10), chunks=(10, 10)), {1: 5}), + (da.ones(shape=(10, 10), chunks=(10, 10)), (None, (5, 5))), + (da.ones(shape=(10, 10), chunks=(10, 2)), (None, 5)), + (da.ones(shape=(10, 10), chunks=(10, 2)), {1: 5}), + (da.ones(shape=(10, 10), chunks=(10, 2)), (None, (5, 5))), + ], +) +@gen_cluster(client=True) +async def test_rechunk_with_partially_unknown_dimension(c, s, *ws, x, chunks): + """ + See Also + -------- + dask.array.tests.test_rechunk.test_rechunk_with_partially_unknown_dimension + """ + dd = pytest.importorskip("dask.dataframe") + y = dd.from_array(x).values + z = da.concatenate([x, y]) + xx = da.concatenate([x, x]) + result = z.rechunk(chunks, method="p2p") + expected = xx.rechunk(chunks, method="p2p") + assert_chunks_match(result.chunks, expected.chunks) + assert_eq(await c.compute(result), await c.compute(expected)) + + +@pytest.mark.parametrize( + "new_chunks", + [ + ((np.nan, np.nan), (5, 5)), + ((math.nan, math.nan), (5, 5)), + ((float("nan"), float("nan")), (5, 5)), + ], +) @gen_cluster(client=True) -async def test_rechunk_unknown_explicit(c, s, *ws): +async def test_rechunk_with_fully_unknown_dimension_explicit(c, s, *ws, new_chunks): """ See Also -------- - dask.array.tests.test_rechunk.test_rechunk_unknown_explicit + dask.array.tests.test_rechunk.test_rechunk_with_fully_unknown_dimension_explicit """ dd = pytest.importorskip("dask.dataframe") x = da.ones(shape=(10, 10), chunks=(5, 2)) y = dd.from_array(x).values - result = y.rechunk(((np.nan, np.nan), (5, 5)), method="p2p") + result = y.rechunk(new_chunks, method="p2p") expected = x.rechunk((None, (5, 5)), method="p2p") assert_chunks_match(result.chunks, expected.chunks) assert_eq(await c.compute(result), await c.compute(expected)) def assert_chunks_match(left, right): - for x, y in zip(left, right): - if np.isnan(x).any(): - assert np.isnan(x).all() - else: - assert x == y + for ldim, rdim in zip(left, right): + assert all(np.isnan(l) or l == r for l, r in zip(ldim, rdim)) @gen_cluster(client=True) @@ -631,9 +699,17 @@ async def test_rechunk_unknown_raises(c, s, *ws): """ dd = pytest.importorskip("dask.dataframe") - x = dd.from_array(da.ones(shape=(10, 10), chunks=(5, 5))).values - with pytest.raises(ValueError): - x.rechunk((None, (5, 5, 5)), method="p2p") + x = da.ones(shape=(10, 10), chunks=(5, 5)) + y = dd.from_array(x).values + with pytest.raises(ValueError, match="Chunks do not add"): + y.rechunk((None, (5, 5, 5)), method="p2p") + + with pytest.raises(ValueError, match="Chunks must be unchanging"): + y.rechunk(((5, 5), (5, 5)), method="p2p") + + with pytest.raises(ValueError, match="Chunks must be unchanging"): + z = da.concatenate([x, y]) + z.rechunk(((5, 3, 2, np.nan, np.nan), (5, 5)), method="p2p") @gen_cluster(client=True) From 1044b517429ebf0d21401084533cbccfcc855b52 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 25 May 2023 10:39:02 +0200 Subject: [PATCH 08/12] Fix handling of unknown chunk sizes --- distributed/shuffle/_rechunk.py | 31 ++++++++++++++++++++++++ distributed/shuffle/_worker_extension.py | 4 +++ 2 files changed, 35 insertions(+) diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 738c8034f79..775813f5a18 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -72,6 +72,37 @@ def rechunk_p2p(x: da.Array, chunks: ChunkedAxes) -> da.Array: # Special case for empty array, as the algorithm below does not behave correctly return da.empty(x.shape, chunks=chunks, dtype=x.dtype) + import math + from itertools import compress + + old_chunks = x.chunks + new_chunks = chunks + + def is_unknown(dim: ChunkedAxis) -> bool: + return any(math.isnan(chunk) for chunk in dim) + + old_is_unknown = [is_unknown(dim) for dim in old_chunks] + new_is_unknown = [is_unknown(dim) for dim in new_chunks] + + if old_is_unknown != new_is_unknown or any( + new != old for new, old in compress(zip(old_chunks, new_chunks), old_is_unknown) + ): + raise ValueError( + "Chunks must be unchanging along dimensions with missing values.\n\n" + "A possible solution:\n x.compute_chunk_sizes()" + ) + + old_known = [dim for dim, unknown in zip(old_chunks, old_is_unknown) if not unknown] + new_known = [dim for dim, unknown in zip(new_chunks, new_is_unknown) if not unknown] + + old_sizes = [sum(o) for o in old_known] + new_sizes = [sum(n) for n in new_known] + + if old_sizes != new_sizes: + raise ValueError( + f"Cannot change dimensions from {old_sizes!r} to {new_sizes!r}" + ) + dsk: dict = {} token = tokenize(x, chunks) _barrier_key = barrier_key(ShuffleId(token)) diff --git a/distributed/shuffle/_worker_extension.py b/distributed/shuffle/_worker_extension.py index 4ef6a6a7b6c..254cc51f58c 100644 --- a/distributed/shuffle/_worker_extension.py +++ b/distributed/shuffle/_worker_extension.py @@ -315,6 +315,10 @@ def __init__( memory_limiter_comms=memory_limiter_comms, memory_limiter_disk=memory_limiter_disk, ) + from dask.array.core import normalize_chunks + + old = normalize_chunks(old) + new = normalize_chunks(new) self.old = old self.new = new partitions_of = defaultdict(list) From 16ea91d32d3fc18dca2b95c943e197e96b2c0e47 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 25 May 2023 10:43:10 +0200 Subject: [PATCH 09/12] Imports --- distributed/shuffle/_rechunk.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 775813f5a18..e58d0608f99 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -1,7 +1,8 @@ from __future__ import annotations +import math from collections import defaultdict -from itertools import product +from itertools import compress, product from typing import TYPE_CHECKING, NamedTuple import dask @@ -72,9 +73,6 @@ def rechunk_p2p(x: da.Array, chunks: ChunkedAxes) -> da.Array: # Special case for empty array, as the algorithm below does not behave correctly return da.empty(x.shape, chunks=chunks, dtype=x.dtype) - import math - from itertools import compress - old_chunks = x.chunks new_chunks = chunks From a7ae0070c2fc73ce0dda304eeac52f1f6046c370 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 25 May 2023 12:20:21 +0200 Subject: [PATCH 10/12] Skip tests due to performance problems --- distributed/shuffle/tests/test_rechunk.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/distributed/shuffle/tests/test_rechunk.py b/distributed/shuffle/tests/test_rechunk.py index 523ac3987ef..a9e5996ff97 100644 --- a/distributed/shuffle/tests/test_rechunk.py +++ b/distributed/shuffle/tests/test_rechunk.py @@ -633,9 +633,21 @@ async def test_rechunk_with_fully_unknown_dimension(c, s, *ws, x, chunks): (da.ones(shape=(50, 10), chunks=(25, 10)), (None, 5)), (da.ones(shape=(50, 10), chunks=(25, 10)), {1: 5}), (da.ones(shape=(50, 10), chunks=(25, 10)), (None, (5, 5))), - (da.ones(shape=(1000, 10), chunks=(5, 10)), (None, 5)), - (da.ones(shape=(1000, 10), chunks=(5, 10)), {1: 5}), - (da.ones(shape=(1000, 10), chunks=(5, 10)), (None, (5, 5))), + pytest.param( + da.ones(shape=(1000, 10), chunks=(5, 10)), + (None, 5), + marks=pytest.mark.skip(reason="distributed#7757"), + ), + pytest.param( + da.ones(shape=(1000, 10), chunks=(5, 10)), + {1: 5}, + marks=pytest.mark.skip(reason="distributed#7757"), + ), + pytest.param( + da.ones(shape=(1000, 10), chunks=(5, 10)), + (None, (5, 5)), + marks=pytest.mark.skip(reason="distributed#7757"), + ), (da.ones(shape=(10, 10), chunks=(10, 10)), (None, 5)), (da.ones(shape=(10, 10), chunks=(10, 10)), {1: 5}), (da.ones(shape=(10, 10), chunks=(10, 10)), (None, (5, 5))), From f681720e08c7a8827985704baeef38c52370a887 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 25 May 2023 14:25:51 +0200 Subject: [PATCH 11/12] Comment --- distributed/shuffle/_worker_extension.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/shuffle/_worker_extension.py b/distributed/shuffle/_worker_extension.py index 254cc51f58c..30a031fc46a 100644 --- a/distributed/shuffle/_worker_extension.py +++ b/distributed/shuffle/_worker_extension.py @@ -317,6 +317,10 @@ def __init__( ) from dask.array.core import normalize_chunks + # We rely on a canonical `np.nan` in `dask.array.rechunk.old_to_new` + # that passes an implicit identity check when testing for list equality. + # This does not work with (de)serialization, so we have to normalize the chunks + # here again to canonicalize `nan`s. old = normalize_chunks(old) new = normalize_chunks(new) self.old = old From b6fe358d268859cfa5fff03bae44184122789155 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 25 May 2023 14:50:10 +0200 Subject: [PATCH 12/12] pre-commit --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1c95c4a7c58..5c030e105f1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -63,5 +63,5 @@ repos: - pytest - tornado - pyarrow - - git+https://github.com/dask/dask.git + - git+https://github.com/dask/dask - git+https://github.com/dask/zict