From b80db3d24d6a494cbcb5db89dbc0fc1af3bfe23a Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Sun, 3 May 2026 15:55:07 -0700 Subject: [PATCH] Add memory guard to flow_direction_mfd numpy/cupy backends (#1423) The (8, H, W) float64 fractions buffer is an 8x amplifier on top of the input raster. Without a guard, a 50000x50000 DEM asks for ~100 GB silently before any error. Port the _check_memory / _check_gpu_memory helpers from flow_accumulation_mfd (64 bytes/pixel budget, 50% threshold) and call them from _run_numpy and _run_cupy. Dask paths process per-chunk through map_overlap and inherit the per-chunk guard. --- xrspatial/hydro/flow_direction_mfd.py | 61 +++++++++++++++++++ .../hydro/tests/test_flow_direction_mfd.py | 53 ++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/xrspatial/hydro/flow_direction_mfd.py b/xrspatial/hydro/flow_direction_mfd.py index eb9c5f2d..bc2e793d 100644 --- a/xrspatial/hydro/flow_direction_mfd.py +++ b/xrspatial/hydro/flow_direction_mfd.py @@ -35,6 +35,65 @@ class cupy(object): _SQRT2_INV = 1.0 / math.sqrt(2.0) +# Working memory budget for the (8, H, W) float64 fractions array. +_BYTES_PER_PIXEL = 8 * 8 # 8 bands * 8 bytes/float64 +_GPU_BYTES_PER_PIXEL = 8 * 8 + + +def _available_memory_bytes(): + """Best-effort estimate of available host memory in bytes.""" + try: + with open('/proc/meminfo', 'r') as f: + for line in f: + if line.startswith('MemAvailable:'): + return int(line.split()[1]) * 1024 # kB -> bytes + except (OSError, ValueError, IndexError): + pass + try: + import psutil + return psutil.virtual_memory().available + except (ImportError, AttributeError): + pass + return 2 * 1024 ** 3 + + +def _available_gpu_memory_bytes(): + """Free GPU memory in bytes, or 0 when CUDA is unavailable.""" + try: + import cupy as _cp + free, _total = _cp.cuda.runtime.memGetInfo() + return int(free) + except Exception: + return 0 + + +def _check_memory(height, width): + """Raise MemoryError if the (8, H, W) buffer would exceed 50% of RAM.""" + required = int(height) * int(width) * _BYTES_PER_PIXEL + available = _available_memory_bytes() + if required > 0.5 * available: + raise MemoryError( + f"flow_direction_mfd on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of working memory but only " + f"~{available / 1e9:.1f} GB is available. Use a " + f"dask-backed DataArray for out-of-core processing." + ) + + +def _check_gpu_memory(height, width): + """Raise MemoryError if the (8, H, W) buffer would exceed 50% of free VRAM.""" + available = _available_gpu_memory_bytes() + if available <= 0: + return + required = int(height) * int(width) * _GPU_BYTES_PER_PIXEL + if required > 0.5 * available: + raise MemoryError( + f"flow_direction_mfd on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of GPU working memory but only " + f"~{available / 1e9:.1f} GB is free on the active device. " + f"Use a dask+cupy DataArray for out-of-core processing." + ) + # ===================================================================== # CPU kernel @@ -230,6 +289,7 @@ def _run_numpy(data: np.ndarray, cellsize_y: Union[int, float], p_fixed: float, boundary: str = 'nan') -> np.ndarray: + _check_memory(data.shape[0], data.shape[1]) data = data.astype(np.float64) if boundary == 'nan': return _cpu(data, cellsize_x, cellsize_y, p_fixed) @@ -271,6 +331,7 @@ def _run_cupy(data: cupy.ndarray, cellsize_y: Union[int, float], p_fixed: float, boundary: str = 'nan') -> cupy.ndarray: + _check_gpu_memory(data.shape[0], data.shape[1]) if boundary != 'nan': padded = _pad_array(data, 1, boundary) result = _run_cupy(padded, cellsize_x, cellsize_y, p_fixed) diff --git a/xrspatial/hydro/tests/test_flow_direction_mfd.py b/xrspatial/hydro/tests/test_flow_direction_mfd.py index a689ebbb..2adf9060 100644 --- a/xrspatial/hydro/tests/test_flow_direction_mfd.py +++ b/xrspatial/hydro/tests/test_flow_direction_mfd.py @@ -512,3 +512,56 @@ def test_known_values_cardinal_vs_diagonal(): # SE: slope = 5/diag = 5/0.707 = 7.07, contour = 1/sqrt(2) = 0.707, weight = 5.0 # E should get more than SE assert e_frac > se_frac, f"E={e_frac} should exceed SE={se_frac}" + + +# ===================================================================== +# Memory guard +# ===================================================================== + +class TestMemoryGuard: + """Memory guard on the eager numpy / cupy backends (issue #1423).""" + + def test_numpy_huge_raster_raises(self): + """Numpy backend raises MemoryError when projected RAM exceeds budget.""" + from unittest.mock import patch + + elev = np.full((7, 7), 5.0, dtype=np.float64) + + with patch( + "xrspatial.hydro.flow_direction_mfd._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="working memory"): + flow_direction_mfd(create_test_raster(elev)) + + def test_numpy_normal_input_succeeds(self): + """Normal-size raster passes the guard with real memory.""" + elev = np.full((7, 7), 5.0, dtype=np.float64) + result = flow_direction_mfd(create_test_raster(elev)) + assert result.shape == (8, 7, 7) + + def test_error_message_mentions_dimensions(self): + """Error message should mention the offending grid dimensions.""" + from unittest.mock import patch + + elev = np.full((7, 7), 5.0, dtype=np.float64) + + with patch( + "xrspatial.hydro.flow_direction_mfd._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="7x7"): + flow_direction_mfd(create_test_raster(elev)) + + def test_error_message_mentions_dask(self): + """The error message should suggest the dask alternative.""" + from unittest.mock import patch + + elev = np.full((7, 7), 5.0, dtype=np.float64) + + with patch( + "xrspatial.hydro.flow_direction_mfd._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="dask"): + flow_direction_mfd(create_test_raster(elev))