From a5d9ed8d581acc514a48c91abf1219efa7f93d46 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Fri, 1 May 2026 20:32:27 -0700 Subject: [PATCH] Lazy assembly for hand_mfd dask path (#1416) The dask assembly phase in `hand_mfd._hand_mfd_dask` previously computed every tile via `blocks[iy, ix].compute()` into a list-of-lists of numpy arrays, then called `da.block(rows)`. Driver memory held the full grid at once, defeating the per-tile streaming used elsewhere in the hydro subpackage. Replace the eager block-list with `da.map_blocks` over `(flow_accum_da, elev_da)`, slicing `fractions_da` per tile inside the closure. The converged `boundaries` and `frac_bdry` snapshots are captured by the closure so each tile is recomputed lazily on demand. Matches the pattern already used by `hand_dinf._hand_dinf_dask`. The boundary-propagation phase already streamed one tile at a time, so it is unchanged. Add two regression tests: - `test_dask_result_is_lazy` checks the result preserves chunking. - `test_dask_assembly_does_not_materialize_all_tiles` checks the dask graph contains one task per output chunk rather than collapsing to a single materialized array. --- .claude/sweep-performance-state.csv | 1 + xrspatial/hydro/hand_mfd.py | 72 +++++++++++++++----------- xrspatial/hydro/tests/test_hand_mfd.py | 55 ++++++++++++++++++++ 3 files changed, 99 insertions(+), 29 deletions(-) diff --git a/.claude/sweep-performance-state.csv b/.claude/sweep-performance-state.csv index abfb7044..0b5659c1 100644 --- a/.claude/sweep-performance-state.csv +++ b/.claude/sweep-performance-state.csv @@ -21,6 +21,7 @@ geodesic,2026-03-31T18:00:00Z,N/A,compute-bound,0,, geotiff,2026-04-16T18:00:00Z,SAFE,IO-bound,0,fixed-in-tree,"Fixed-in-tree 2026-04-16: (1) read_geotiff_dask caps total chunk count at 1M and auto-scales chunks with a UserWarning when exceeded (linear graph scaling confirmed: 16384 tasks/1109ms vs 16 tasks/65ms baseline). (2) _nvcomp_batch_compress deflate path now batches GPU->CPU adler32 transfers via a single contiguous .get() and memoryview slicing, eliminating N per-tile sync points. 435 geotiff tests pass (2 pre-existing matplotlib deepcopy unrelated)." glcm,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,"Downgraded to MEDIUM. da.stack without rechunk is scheduling overhead, not OOM risk." hillshade,2026-04-16T12:00:00Z,SAFE,compute-bound,0,,"Re-audit after Horn's method rewrite (PR 1175): clean stencil, map_overlap depth=(1,1), no materialization. Zero findings." +hydro,2026-05-01,RISKY,memory-bound,0,1416,"Fixed-in-tree 2026-05-01: hand_mfd._hand_mfd_dask now assembles via da.map_blocks instead of eager da.block of pre-computed tiles (matches hand_dinf pattern). Remaining MEDIUM: sink_d8 CCL fully materializes labels (inherently global), flow_accumulation_mfd frac_bdry held in driver dict instead of memmap-backed BoundaryStore. D8 iterative paths (flow_accum/fill/watershed/basin/stream_*) use serial-tile sweep with memmap-backed boundary store -- per-tile RAM bounded but driver iterates O(diameter) times. flow_direction_*, flow_path/snap_pour_point/twi/hand_d8/hand_dinf are SAFE." kde,2026-04-14T12:00:00Z,SAFE,compute-bound,0,,Graph construction serialized per-tile. _filter_points_to_tile scans all points per tile. No HIGH findings. mahalanobis,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,False positive. Numpy path materializes by design. Dask path uses lazy reductions + map_blocks. morphology,2026-03-31T18:00:00Z,SAFE,compute-bound,0,, diff --git a/xrspatial/hydro/hand_mfd.py b/xrspatial/hydro/hand_mfd.py index 146ccb7b..23d49507 100644 --- a/xrspatial/hydro/hand_mfd.py +++ b/xrspatial/hydro/hand_mfd.py @@ -597,35 +597,49 @@ def _hand_mfd_dask(fractions_da, flow_accum_da, elev_da, threshold, boundaries = boundaries.snapshot() - # Assemble - rows = [] - for iy in range(n_tile_y): - row = [] - for ix in range(n_tile_x): - y_start = sum(chunks_y[:iy]) - y_end = y_start + chunks_y[iy] - x_start = sum(chunks_x[:ix]) - x_end = x_start + chunks_x[ix] - - fr_chunk = np.asarray( - fractions_da[:, y_start:y_end, x_start:x_end].compute(), - dtype=np.float64) - fa_chunk = np.asarray( - flow_accum_da.blocks[iy, ix].compute(), dtype=np.float64) - el_chunk = np.asarray( - elev_da.blocks[iy, ix].compute(), dtype=np.float64) - _, h, w = fr_chunk.shape - - exits = _compute_exit_labels_mfd( - iy, ix, boundaries, frac_bdry, - chunks_y, chunks_x, n_tile_y, n_tile_x) - - tile = _hand_mfd_tile_kernel( - fr_chunk, fa_chunk, el_chunk, h, w, threshold, *exits) - row.append(da.from_array(tile, chunks=tile.shape)) - rows.append(row) - - return da.block(rows) + # Lazy assembly: each tile is recomputed on demand from the converged + # boundary state. Driver memory holds only the captured ``boundaries`` + # / ``frac_bdry`` snapshots (boundary strips, not full tiles), so peak + # memory scales with chunk size rather than the full grid. + # + # ``fractions_da`` is 3D (8, H, W); we cannot align its chunks with the + # 2D output via map_blocks directly. We pre-compute the (y_start, + # y_end, x_start, x_end) offsets per tile so each map_blocks closure + # call only triggers its own fractions slice. + cum_y = np.zeros(n_tile_y + 1, dtype=np.int64) + np.cumsum(chunks_y, out=cum_y[1:]) + cum_x = np.zeros(n_tile_x + 1, dtype=np.int64) + np.cumsum(chunks_x, out=cum_x[1:]) + + def _tile_fn(fa_block, el_block, block_info=None): + if block_info is None or 0 not in block_info: + return np.full(fa_block.shape, np.nan, dtype=np.float64) + iy, ix = block_info[0]['chunk-location'] + y_start = int(cum_y[iy]) + y_end = int(cum_y[iy + 1]) + x_start = int(cum_x[ix]) + x_end = int(cum_x[ix + 1]) + + fr_chunk = np.asarray( + fractions_da[:, y_start:y_end, x_start:x_end].compute(), + dtype=np.float64) + fa_chunk = np.asarray(fa_block, dtype=np.float64) + el_chunk = np.asarray(el_block, dtype=np.float64) + _, h, w = fr_chunk.shape + + exits = _compute_exit_labels_mfd( + iy, ix, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x) + + return _hand_mfd_tile_kernel( + fr_chunk, fa_chunk, el_chunk, h, w, threshold, *exits) + + return da.map_blocks( + _tile_fn, + flow_accum_da, elev_da, + dtype=np.float64, + meta=np.array((), dtype=np.float64), + ) def _hand_mfd_dask_cupy(fractions_da, flow_accum_da, elev_da, threshold, diff --git a/xrspatial/hydro/tests/test_hand_mfd.py b/xrspatial/hydro/tests/test_hand_mfd.py index 3092b4b1..bead8b2b 100644 --- a/xrspatial/hydro/tests/test_hand_mfd.py +++ b/xrspatial/hydro/tests/test_hand_mfd.py @@ -165,6 +165,61 @@ def test_numpy_equals_dask(self, chunks): equal_nan=True, rtol=1e-10) +@dask_array_available +class TestHandMfdDaskLazyAssembly: + """Regression for issue #1416. + + The dask assembly phase must use ``map_blocks`` so each tile is + recomputed on demand from the converged boundary state. An earlier + implementation built a list-of-lists of pre-computed numpy arrays + and called ``da.block``, which held every tile in driver memory at + once and OOMed on large inputs. + """ + + def test_dask_result_is_lazy(self): + import dask.array as da + + fracs = _make_all_east(8, 8) + fa = np.ones((8, 8), dtype=np.float64) + elev = np.zeros((8, 8), dtype=np.float64) + fd_da, fa_da, el_da = _make_hand_mfd_rasters( + fracs, fa, elev, backend='dask', chunks=(2, 2)) + + result = hand_mfd(fd_da, fa_da, el_da, threshold=1) + + # The result must be a dask-backed DataArray with intact chunking. + assert isinstance(result.data, da.Array) + assert result.data.chunks == ((2, 2, 2, 2), (2, 2, 2, 2)) + + def test_dask_assembly_does_not_materialize_all_tiles(self): + """The lazy graph must contain a separate task per output chunk. + + ``da.block`` of pre-computed arrays produces a single ``from_array`` + task plus concatenation overhead. ``map_blocks`` produces one + kernel-call task per chunk, which is what we need so that only one + tile is in RAM at a time during execution. + """ + import dask.array as da + + fracs = _make_all_east(6, 6) + fa = np.ones((6, 6), dtype=np.float64) + elev = np.zeros((6, 6), dtype=np.float64) + fd_da, fa_da, el_da = _make_hand_mfd_rasters( + fracs, fa, elev, backend='dask', chunks=(2, 2)) + + result = hand_mfd(fd_da, fa_da, el_da, threshold=1) + + # 3x3 = 9 output chunks. Number of leaf keys for the result name + # must match the chunk count. + result_arr = result.data + assert isinstance(result_arr, da.Array) + result_keys = [ + k for k in result_arr.__dask_graph__() + if isinstance(k, tuple) and k[0] == result_arr.name + ] + assert len(result_keys) == 9 + + @cuda_and_cupy_available class TestHandMfdCuPy: def test_numpy_equals_cupy(self):