diff --git a/xrspatial/hydro/tests/test_watershed_mfd.py b/xrspatial/hydro/tests/test_watershed_mfd.py index dbf9955b..e997c444 100644 --- a/xrspatial/hydro/tests/test_watershed_mfd.py +++ b/xrspatial/hydro/tests/test_watershed_mfd.py @@ -202,3 +202,91 @@ def test_numpy_equals_dask_cupy(): np.testing.assert_allclose( np_result.data, dcp_result.data.compute().get(), equal_nan=True) + + +# ==================================================================== +# Memory guard tests +# ==================================================================== + +class TestMemoryGuard: + """Memory guard on the eager numpy / cupy backends.""" + + def test_numpy_huge_raster_raises(self): + """Numpy backend raises MemoryError when projected RAM exceeds budget.""" + from unittest.mock import patch + + fracs = _make_all_east(4, 4) + pp = np.full((4, 4), np.nan, dtype=np.float64) + pp[0, 0] = 1.0 + fd_da = _make_mfd_raster(fracs, backend='numpy') + pp_da = create_test_raster(pp, backend='numpy') + + with patch( + "xrspatial.hydro.watershed_mfd._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="working memory"): + watershed_mfd(fd_da, pp_da) + + def test_numpy_normal_input_succeeds(self): + """Normal-size raster passes the guard with real memory.""" + fracs = _make_all_east(10, 10) + pp = np.full((10, 10), np.nan, dtype=np.float64) + pp[0, 0] = 1.0 + fd_da = _make_mfd_raster(fracs, backend='numpy') + pp_da = create_test_raster(pp, backend='numpy') + result = watershed_mfd(fd_da, pp_da) + assert result.shape == (10, 10) + + @dask_array_available + def test_dask_path_skips_guard(self): + """Dask backend bypasses the guard -- per-tile allocations are bounded.""" + from unittest.mock import patch + + fracs = _make_all_east(6, 6) + pp = np.full((6, 6), np.nan, dtype=np.float64) + pp[0, 0] = 1.0 + fd_da = _make_mfd_raster(fracs, backend='dask', chunks=(3, 3)) + pp_da = create_test_raster(pp, backend='dask', chunks=(3, 3)) + + with patch( + "xrspatial.hydro.watershed_mfd._available_memory_bytes", + return_value=1, + ): + result = watershed_mfd(fd_da, pp_da) + _ = result.data[:3, :3].compute() + + def test_error_message_mentions_dimensions(self): + """The error message should mention the grid dimensions and dask.""" + from unittest.mock import patch + + fracs = _make_all_east(7, 9) + pp = np.full((7, 9), np.nan, dtype=np.float64) + pp[0, 0] = 1.0 + fd_da = _make_mfd_raster(fracs, backend='numpy') + pp_da = create_test_raster(pp, backend='numpy') + + with patch( + "xrspatial.hydro.watershed_mfd._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match=r"7x9.*dask"): + watershed_mfd(fd_da, pp_da) + + @cuda_and_cupy_available + def test_cupy_huge_raster_raises(self): + """CuPy backend raises MemoryError when projected GPU RAM exceeds budget.""" + from unittest.mock import patch + + fracs = _make_all_east(4, 4) + pp = np.full((4, 4), np.nan, dtype=np.float64) + pp[0, 0] = 1.0 + fd_da = _make_mfd_raster(fracs, backend='cupy') + pp_da = create_test_raster(pp, backend='cupy') + + with patch( + "xrspatial.hydro.watershed_mfd._available_gpu_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="GPU working memory"): + watershed_mfd(fd_da, pp_da) diff --git a/xrspatial/hydro/watershed_mfd.py b/xrspatial/hydro/watershed_mfd.py index 5a7ddbc1..6be3a0f5 100644 --- a/xrspatial/hydro/watershed_mfd.py +++ b/xrspatial/hydro/watershed_mfd.py @@ -47,6 +47,93 @@ def _to_numpy_f64(arr): return np.asarray(arr, dtype=np.float64) +# ===================================================================== +# Memory guards +# ===================================================================== +# +# CPU peak working set per pixel for the numpy dispatch + the +# ``_watershed_mfd_cpu`` kernel: +# fr (float64 cast of (8, H, W) fractions) -> 64 +# labels (float64) -> 8 +# state (int8) -> 1 +# path_r (int64) -> 8 +# path_c (int64) -> 8 +# Total ~97 bytes/pixel. The MFD fractions buffer is the new cost +# relative to ``watershed_d8`` (which only needs an 8 B/pixel D8 array). +_BYTES_PER_PIXEL = 97 + +# GPU peak working set per pixel for ``_watershed_mfd_cupy``. The +# function copies the device fractions array to the host, runs the CPU +# kernel, and ships the result back. Device-resident peak is the +# caller's float64 fractions input (8 channels x 8 bytes = 64) plus the +# final ``cp.asarray(out)`` (8) -> 72 bytes/pixel. +_GPU_BYTES_PER_PIXEL = 72 + + +def _available_memory_bytes(): + """Best-effort estimate of available host memory in bytes.""" + try: + with open('/proc/meminfo', 'r') as f: + for line in f: + if line.startswith('MemAvailable:'): + return int(line.split()[1]) * 1024 # kB -> bytes + except (OSError, ValueError, IndexError): + pass + try: + import psutil + return psutil.virtual_memory().available + except (ImportError, AttributeError): + pass + return 2 * 1024 ** 3 + + +def _available_gpu_memory_bytes(): + """Best-effort estimate of free GPU memory in bytes. + + Returns 0 if CuPy / CUDA is unavailable or the query fails -- callers + use that as a sentinel meaning "no GPU info, skip the guard". + """ + try: + import cupy as _cp + free, _total = _cp.cuda.runtime.memGetInfo() + return int(free) + except Exception: + return 0 + + +def _check_memory(height, width): + """Raise MemoryError if the watershed_mfd kernel would exceed 50% of RAM.""" + required = int(height) * int(width) * _BYTES_PER_PIXEL + available = _available_memory_bytes() + if required > 0.5 * available: + raise MemoryError( + f"watershed_mfd on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of working memory but only " + f"~{available / 1e9:.1f} GB is available. Use a " + f"dask-backed DataArray for out-of-core processing." + ) + + +def _check_gpu_memory(height, width): + """Raise MemoryError if the cupy path would exceed 50% of free GPU RAM. + + Skips the check (returns silently) when ``_available_gpu_memory_bytes`` + cannot determine the free memory -- e.g. on hosts without CUDA, where + the kernel will fail at the cupy.asarray boundary anyway. + """ + available = _available_gpu_memory_bytes() + if available <= 0: + return + required = int(height) * int(width) * _GPU_BYTES_PER_PIXEL + if required > 0.5 * available: + raise MemoryError( + f"watershed_mfd on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of GPU working memory but only " + f"~{available / 1e9:.1f} GB is free on the active device. " + f"Use a dask+cupy DataArray for out-of-core processing." + ) + + def _dominant_offset_mfd_py(fractions_8): """Return (dy, dx) of dominant MFD neighbor, or (0,0) for pit/nodata.""" best_k = -1 @@ -592,6 +679,7 @@ def watershed_mfd(flow_dir_mfd: xr.DataArray, _, H, W = data.shape if isinstance(data, np.ndarray): + _check_memory(H, W) fr = data.astype(np.float64) pp = np.asarray(pp_data, dtype=np.float64) labels = np.full((H, W), np.nan, dtype=np.float64) @@ -608,6 +696,7 @@ def watershed_mfd(flow_dir_mfd: xr.DataArray, out = _watershed_mfd_cpu(fr, labels, state, H, W) elif has_cuda_and_cupy() and is_cupy_array(data): + _check_gpu_memory(H, W) out = _watershed_mfd_cupy(data, pp_data) elif has_cuda_and_cupy() and is_dask_cupy(flow_dir_mfd):