Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions xrspatial/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,37 +621,33 @@ def _run_dask_numpy(data, scale_y, scale_x, method):
dtype=np.float32, meta=meta)

import math
min_size = max(_min_chunksize_for_scale(scale_y),
_min_chunksize_for_scale(scale_x))
data = _ensure_min_chunksize(data, min_size)

# Aggregate windows can cross chunk boundaries; size chunks to satisfy
# both the scale-driven minimum and the depth-driven minimum in one pass,
# then build the cumulative arrays once.
global_in_h = int(sum(data.chunks[0]))
global_in_w = int(sum(data.chunks[1]))
global_out_h, global_out_w = _output_shape(
global_in_h, global_in_w, scale_y, scale_x)
depth_y = math.ceil(global_in_h / global_out_h)
depth_x = math.ceil(global_in_w / global_out_w)
min_size = max(_min_chunksize_for_scale(scale_y),
_min_chunksize_for_scale(scale_x),
2 * depth_y + 1, 2 * depth_x + 1)
data = _ensure_min_chunksize(data, min_size)

out_y = _output_chunks(data.chunks[0], scale_y)
out_x = _output_chunks(data.chunks[1], scale_x)
cum_in_y = np.cumsum([0] + list(data.chunks[0]))
cum_in_x = np.cumsum([0] + list(data.chunks[1]))
cum_out_y = np.cumsum([0] + list(out_y))
cum_out_x = np.cumsum([0] + list(out_x))

# Aggregate windows can cross chunk boundaries; add overlap.
depth_y = math.ceil(global_in_h / global_out_h)
depth_x = math.ceil(global_in_w / global_out_w)
data = _ensure_min_chunksize(data, max(2 * depth_y + 1, 2 * depth_x + 1))
# Recompute in case rechunk changed layout
if data.chunks[0] != tuple(cum_in_y[1:] - cum_in_y[:-1]):
cum_in_y = np.cumsum([0] + list(data.chunks[0]))
cum_in_x = np.cumsum([0] + list(data.chunks[1]))
out_y = _output_chunks(data.chunks[0], scale_y)
out_x = _output_chunks(data.chunks[1], scale_x)
cum_out_y = np.cumsum([0] + list(out_y))
cum_out_x = np.cumsum([0] + list(out_x))

# boundary=np.nan keeps overlap padding from contaminating the aggregate
# at the global edges. The kernels skip NaN inputs and return NaN for
# empty windows, so the padded region is ignored naturally.
from dask.array.overlap import overlap as _add_overlap
src = _add_overlap(data, depth={0: depth_y, 1: depth_x},
boundary='nearest')
boundary=np.nan)

fn = partial(_agg_block_np, method=method,
global_in_h=global_in_h, global_in_w=global_in_w,
Expand Down Expand Up @@ -704,18 +700,19 @@ def _run_dask_cupy(data, scale_y, scale_x, method):
dtype=cupy.float32, meta=meta)

import math
min_size = max(_min_chunksize_for_scale(scale_y),
_min_chunksize_for_scale(scale_x))
data = _ensure_min_chunksize(data, min_size)

# Aggregate windows can cross chunk boundaries; size chunks to satisfy
# both the scale-driven minimum and the depth-driven minimum in one pass,
# then build the cumulative arrays once.
global_in_h = int(sum(data.chunks[0]))
global_in_w = int(sum(data.chunks[1]))
global_out_h, global_out_w = _output_shape(
global_in_h, global_in_w, scale_y, scale_x)

depth_y = math.ceil(global_in_h / global_out_h)
depth_x = math.ceil(global_in_w / global_out_w)
data = _ensure_min_chunksize(data, max(2 * depth_y + 1, 2 * depth_x + 1))
min_size = max(_min_chunksize_for_scale(scale_y),
_min_chunksize_for_scale(scale_x),
2 * depth_y + 1, 2 * depth_x + 1)
data = _ensure_min_chunksize(data, min_size)

out_y = _output_chunks(data.chunks[0], scale_y)
out_x = _output_chunks(data.chunks[1], scale_x)
Expand All @@ -724,9 +721,12 @@ def _run_dask_cupy(data, scale_y, scale_x, method):
cum_out_y = np.cumsum([0] + list(out_y))
cum_out_x = np.cumsum([0] + list(out_x))

# boundary=np.nan keeps overlap padding from contaminating the aggregate
# at the global edges. The kernels skip NaN inputs and return NaN for
# empty windows, so the padded region is ignored naturally.
from dask.array.overlap import overlap as _add_overlap
src = _add_overlap(data, depth={0: depth_y, 1: depth_x},
boundary='nearest')
boundary=cupy.nan)

fn = partial(_agg_block_cupy, method=method,
global_in_h=global_in_h, global_in_w=global_in_w,
Expand Down
54 changes: 54 additions & 0 deletions xrspatial/tests/test_resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,60 @@ def test_aggregate_parity(self, numpy_and_dask_rasters, method):
atol=1e-5, equal_nan=True)


# ---------------------------------------------------------------------------
# Aggregate dask boundary contamination (issue #1469)
# ---------------------------------------------------------------------------

@dask_array_available
class TestAggregateDaskBoundary:
"""Aggregate dask path must not let overlap-padding bias the result."""

@pytest.mark.parametrize('method', ['min', 'max', 'median'])
def test_chunk_spanning_window_bit_identical(self, method):
"""Aggregate windows that cross chunk boundaries must equal eager
numpy bit-exactly (same kernel, no padding contribution)."""
rng = np.random.RandomState(20240504)
data = rng.rand(24, 24).astype(np.float32)
np_agg = create_test_raster(data, backend='numpy',
attrs={'res': (1.0, 1.0)})
# Use a chunk size that does NOT divide the input cleanly so that
# output windows span chunk boundaries.
dk_agg = create_test_raster(data, backend='dask+numpy',
attrs={'res': (1.0, 1.0)},
chunks=(7, 7))
np_out = resample(np_agg, scale_factor=0.5, method=method)
dk_out = resample(dk_agg, scale_factor=0.5, method=method)
# min/max/median over the same set of source cells should be
# bit-identical -- no atol slack.
np.testing.assert_array_equal(dk_out.values, np_out.values)

@pytest.mark.parametrize('method', ['min', 'max', 'median'])
def test_global_edge_extremes_match_eager(self, method):
"""Extreme values on the global array edges must not contaminate
adjacent output windows. The aggregate kernel's indexing keeps the
global-edge pad out of any window, and boundary=np.nan reinforces
that contract: pad cells are NaN and would be skipped even if read.
Result must match the eager (non-overlapped) numpy path exactly."""
# Inner field is in a low range; the rightmost column is an
# extreme value that, if duplicated by 'nearest' padding, would
# show up in adjacent output windows that should not see it.
rng = np.random.RandomState(1469)
data = rng.uniform(0.0, 0.1, size=(20, 20)).astype(np.float32)
data[:, -1] = 999.0 # extreme right column
data[:, 0] = -999.0 # extreme left column
data[-1, :] = 999.0 # extreme bottom row
data[0, :] = -999.0 # extreme top row

np_agg = create_test_raster(data, backend='numpy',
attrs={'res': (1.0, 1.0)})
dk_agg = create_test_raster(data, backend='dask+numpy',
attrs={'res': (1.0, 1.0)},
chunks=(8, 8))
np_out = resample(np_agg, scale_factor=0.5, method=method)
dk_out = resample(dk_agg, scale_factor=0.5, method=method)
np.testing.assert_array_equal(dk_out.values, np_out.values)


# ---------------------------------------------------------------------------
# CuPy parity
# ---------------------------------------------------------------------------
Expand Down
Loading