From 2016eea74cdd6f4a86d2a3e4494e3bb4eaa81ad2 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Mon, 4 May 2026 12:53:21 -0700 Subject: [PATCH] Fix dask aggregate boundary contamination and clean up bookkeeping (#1469) Switch the aggregate dask path's overlap to boundary=np.nan so any pad cells the kernels might read are skipped naturally (the kernels already ignore NaN). Compute the depth-driven minimum chunk size up front, combine it with the scale-driven minimum, and call _ensure_min_chunksize once -- removing the wasted first cumulative-array compute and the roundabout chunk-equality recompute branch. Mirror the same change in _run_dask_cupy. Leave the interp dask paths on boundary='nearest' so they keep matching scipy's mode='nearest' semantics that the eager numpy interp path uses. Add tests that pin dask aggregate min/max/median to bit-equal eager numpy for chunk-spanning windows and for arrays with extreme values on the global edges. --- xrspatial/resample.py | 50 ++++++++++++++--------------- xrspatial/tests/test_resample.py | 54 ++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 25 deletions(-) diff --git a/xrspatial/resample.py b/xrspatial/resample.py index 5eb939782..b2f26a314 100644 --- a/xrspatial/resample.py +++ b/xrspatial/resample.py @@ -621,14 +621,20 @@ def _run_dask_numpy(data, scale_y, scale_x, method): dtype=np.float32, meta=meta) import math - min_size = max(_min_chunksize_for_scale(scale_y), - _min_chunksize_for_scale(scale_x)) - data = _ensure_min_chunksize(data, min_size) - + # Aggregate windows can cross chunk boundaries; size chunks to satisfy + # both the scale-driven minimum and the depth-driven minimum in one pass, + # then build the cumulative arrays once. global_in_h = int(sum(data.chunks[0])) global_in_w = int(sum(data.chunks[1])) global_out_h, global_out_w = _output_shape( global_in_h, global_in_w, scale_y, scale_x) + depth_y = math.ceil(global_in_h / global_out_h) + depth_x = math.ceil(global_in_w / global_out_w) + min_size = max(_min_chunksize_for_scale(scale_y), + _min_chunksize_for_scale(scale_x), + 2 * depth_y + 1, 2 * depth_x + 1) + data = _ensure_min_chunksize(data, min_size) + out_y = _output_chunks(data.chunks[0], scale_y) out_x = _output_chunks(data.chunks[1], scale_x) cum_in_y = np.cumsum([0] + list(data.chunks[0])) @@ -636,22 +642,12 @@ def _run_dask_numpy(data, scale_y, scale_x, method): cum_out_y = np.cumsum([0] + list(out_y)) cum_out_x = np.cumsum([0] + list(out_x)) - # Aggregate windows can cross chunk boundaries; add overlap. - depth_y = math.ceil(global_in_h / global_out_h) - depth_x = math.ceil(global_in_w / global_out_w) - data = _ensure_min_chunksize(data, max(2 * depth_y + 1, 2 * depth_x + 1)) - # Recompute in case rechunk changed layout - if data.chunks[0] != tuple(cum_in_y[1:] - cum_in_y[:-1]): - cum_in_y = np.cumsum([0] + list(data.chunks[0])) - cum_in_x = np.cumsum([0] + list(data.chunks[1])) - out_y = _output_chunks(data.chunks[0], scale_y) - out_x = _output_chunks(data.chunks[1], scale_x) - cum_out_y = np.cumsum([0] + list(out_y)) - cum_out_x = np.cumsum([0] + list(out_x)) - + # boundary=np.nan keeps overlap padding from contaminating the aggregate + # at the global edges. The kernels skip NaN inputs and return NaN for + # empty windows, so the padded region is ignored naturally. from dask.array.overlap import overlap as _add_overlap src = _add_overlap(data, depth={0: depth_y, 1: depth_x}, - boundary='nearest') + boundary=np.nan) fn = partial(_agg_block_np, method=method, global_in_h=global_in_h, global_in_w=global_in_w, @@ -704,18 +700,19 @@ def _run_dask_cupy(data, scale_y, scale_x, method): dtype=cupy.float32, meta=meta) import math - min_size = max(_min_chunksize_for_scale(scale_y), - _min_chunksize_for_scale(scale_x)) - data = _ensure_min_chunksize(data, min_size) - + # Aggregate windows can cross chunk boundaries; size chunks to satisfy + # both the scale-driven minimum and the depth-driven minimum in one pass, + # then build the cumulative arrays once. global_in_h = int(sum(data.chunks[0])) global_in_w = int(sum(data.chunks[1])) global_out_h, global_out_w = _output_shape( global_in_h, global_in_w, scale_y, scale_x) - depth_y = math.ceil(global_in_h / global_out_h) depth_x = math.ceil(global_in_w / global_out_w) - data = _ensure_min_chunksize(data, max(2 * depth_y + 1, 2 * depth_x + 1)) + min_size = max(_min_chunksize_for_scale(scale_y), + _min_chunksize_for_scale(scale_x), + 2 * depth_y + 1, 2 * depth_x + 1) + data = _ensure_min_chunksize(data, min_size) out_y = _output_chunks(data.chunks[0], scale_y) out_x = _output_chunks(data.chunks[1], scale_x) @@ -724,9 +721,12 @@ def _run_dask_cupy(data, scale_y, scale_x, method): cum_out_y = np.cumsum([0] + list(out_y)) cum_out_x = np.cumsum([0] + list(out_x)) + # boundary=np.nan keeps overlap padding from contaminating the aggregate + # at the global edges. The kernels skip NaN inputs and return NaN for + # empty windows, so the padded region is ignored naturally. from dask.array.overlap import overlap as _add_overlap src = _add_overlap(data, depth={0: depth_y, 1: depth_x}, - boundary='nearest') + boundary=cupy.nan) fn = partial(_agg_block_cupy, method=method, global_in_h=global_in_h, global_in_w=global_in_w, diff --git a/xrspatial/tests/test_resample.py b/xrspatial/tests/test_resample.py index d77e30613..d20ade1db 100644 --- a/xrspatial/tests/test_resample.py +++ b/xrspatial/tests/test_resample.py @@ -345,6 +345,60 @@ def test_aggregate_parity(self, numpy_and_dask_rasters, method): atol=1e-5, equal_nan=True) +# --------------------------------------------------------------------------- +# Aggregate dask boundary contamination (issue #1469) +# --------------------------------------------------------------------------- + +@dask_array_available +class TestAggregateDaskBoundary: + """Aggregate dask path must not let overlap-padding bias the result.""" + + @pytest.mark.parametrize('method', ['min', 'max', 'median']) + def test_chunk_spanning_window_bit_identical(self, method): + """Aggregate windows that cross chunk boundaries must equal eager + numpy bit-exactly (same kernel, no padding contribution).""" + rng = np.random.RandomState(20240504) + data = rng.rand(24, 24).astype(np.float32) + np_agg = create_test_raster(data, backend='numpy', + attrs={'res': (1.0, 1.0)}) + # Use a chunk size that does NOT divide the input cleanly so that + # output windows span chunk boundaries. + dk_agg = create_test_raster(data, backend='dask+numpy', + attrs={'res': (1.0, 1.0)}, + chunks=(7, 7)) + np_out = resample(np_agg, scale_factor=0.5, method=method) + dk_out = resample(dk_agg, scale_factor=0.5, method=method) + # min/max/median over the same set of source cells should be + # bit-identical -- no atol slack. + np.testing.assert_array_equal(dk_out.values, np_out.values) + + @pytest.mark.parametrize('method', ['min', 'max', 'median']) + def test_global_edge_extremes_match_eager(self, method): + """Extreme values on the global array edges must not contaminate + adjacent output windows. The aggregate kernel's indexing keeps the + global-edge pad out of any window, and boundary=np.nan reinforces + that contract: pad cells are NaN and would be skipped even if read. + Result must match the eager (non-overlapped) numpy path exactly.""" + # Inner field is in a low range; the rightmost column is an + # extreme value that, if duplicated by 'nearest' padding, would + # show up in adjacent output windows that should not see it. + rng = np.random.RandomState(1469) + data = rng.uniform(0.0, 0.1, size=(20, 20)).astype(np.float32) + data[:, -1] = 999.0 # extreme right column + data[:, 0] = -999.0 # extreme left column + data[-1, :] = 999.0 # extreme bottom row + data[0, :] = -999.0 # extreme top row + + np_agg = create_test_raster(data, backend='numpy', + attrs={'res': (1.0, 1.0)}) + dk_agg = create_test_raster(data, backend='dask+numpy', + attrs={'res': (1.0, 1.0)}, + chunks=(8, 8)) + np_out = resample(np_agg, scale_factor=0.5, method=method) + dk_out = resample(dk_agg, scale_factor=0.5, method=method) + np.testing.assert_array_equal(dk_out.values, np_out.values) + + # --------------------------------------------------------------------------- # CuPy parity # ---------------------------------------------------------------------------