Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions xrspatial/hydro/flow_direction_mfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,65 @@ class cupy(object):

_SQRT2_INV = 1.0 / math.sqrt(2.0)

# Working memory budget for the (8, H, W) float64 fractions array.
_BYTES_PER_PIXEL = 8 * 8 # 8 bands * 8 bytes/float64
_GPU_BYTES_PER_PIXEL = 8 * 8


def _available_memory_bytes():
"""Best-effort estimate of available host memory in bytes."""
try:
with open('/proc/meminfo', 'r') as f:
for line in f:
if line.startswith('MemAvailable:'):
return int(line.split()[1]) * 1024 # kB -> bytes
except (OSError, ValueError, IndexError):
pass
try:
import psutil
return psutil.virtual_memory().available
except (ImportError, AttributeError):
pass
return 2 * 1024 ** 3


def _available_gpu_memory_bytes():
"""Free GPU memory in bytes, or 0 when CUDA is unavailable."""
try:
import cupy as _cp
free, _total = _cp.cuda.runtime.memGetInfo()
return int(free)
except Exception:
return 0


def _check_memory(height, width):
"""Raise MemoryError if the (8, H, W) buffer would exceed 50% of RAM."""
required = int(height) * int(width) * _BYTES_PER_PIXEL
available = _available_memory_bytes()
if required > 0.5 * available:
raise MemoryError(
f"flow_direction_mfd on a {height}x{width} grid requires "
f"~{required / 1e9:.1f} GB of working memory but only "
f"~{available / 1e9:.1f} GB is available. Use a "
f"dask-backed DataArray for out-of-core processing."
)


def _check_gpu_memory(height, width):
"""Raise MemoryError if the (8, H, W) buffer would exceed 50% of free VRAM."""
available = _available_gpu_memory_bytes()
if available <= 0:
return
required = int(height) * int(width) * _GPU_BYTES_PER_PIXEL
if required > 0.5 * available:
raise MemoryError(
f"flow_direction_mfd on a {height}x{width} grid requires "
f"~{required / 1e9:.1f} GB of GPU working memory but only "
f"~{available / 1e9:.1f} GB is free on the active device. "
f"Use a dask+cupy DataArray for out-of-core processing."
)


# =====================================================================
# CPU kernel
Expand Down Expand Up @@ -230,6 +289,7 @@ def _run_numpy(data: np.ndarray,
cellsize_y: Union[int, float],
p_fixed: float,
boundary: str = 'nan') -> np.ndarray:
_check_memory(data.shape[0], data.shape[1])
data = data.astype(np.float64)
if boundary == 'nan':
return _cpu(data, cellsize_x, cellsize_y, p_fixed)
Expand Down Expand Up @@ -271,6 +331,7 @@ def _run_cupy(data: cupy.ndarray,
cellsize_y: Union[int, float],
p_fixed: float,
boundary: str = 'nan') -> cupy.ndarray:
_check_gpu_memory(data.shape[0], data.shape[1])
if boundary != 'nan':
padded = _pad_array(data, 1, boundary)
result = _run_cupy(padded, cellsize_x, cellsize_y, p_fixed)
Expand Down
53 changes: 53 additions & 0 deletions xrspatial/hydro/tests/test_flow_direction_mfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,56 @@ def test_known_values_cardinal_vs_diagonal():
# SE: slope = 5/diag = 5/0.707 = 7.07, contour = 1/sqrt(2) = 0.707, weight = 5.0
# E should get more than SE
assert e_frac > se_frac, f"E={e_frac} should exceed SE={se_frac}"


# =====================================================================
# Memory guard
# =====================================================================

class TestMemoryGuard:
"""Memory guard on the eager numpy / cupy backends (issue #1423)."""

def test_numpy_huge_raster_raises(self):
"""Numpy backend raises MemoryError when projected RAM exceeds budget."""
from unittest.mock import patch

elev = np.full((7, 7), 5.0, dtype=np.float64)

with patch(
"xrspatial.hydro.flow_direction_mfd._available_memory_bytes",
return_value=1,
):
with pytest.raises(MemoryError, match="working memory"):
flow_direction_mfd(create_test_raster(elev))

def test_numpy_normal_input_succeeds(self):
"""Normal-size raster passes the guard with real memory."""
elev = np.full((7, 7), 5.0, dtype=np.float64)
result = flow_direction_mfd(create_test_raster(elev))
assert result.shape == (8, 7, 7)

def test_error_message_mentions_dimensions(self):
"""Error message should mention the offending grid dimensions."""
from unittest.mock import patch

elev = np.full((7, 7), 5.0, dtype=np.float64)

with patch(
"xrspatial.hydro.flow_direction_mfd._available_memory_bytes",
return_value=1,
):
with pytest.raises(MemoryError, match="7x7"):
flow_direction_mfd(create_test_raster(elev))

def test_error_message_mentions_dask(self):
"""The error message should suggest the dask alternative."""
from unittest.mock import patch

elev = np.full((7, 7), 5.0, dtype=np.float64)

with patch(
"xrspatial.hydro.flow_direction_mfd._available_memory_bytes",
return_value=1,
):
with pytest.raises(MemoryError, match="dask"):
flow_direction_mfd(create_test_raster(elev))
Loading