diff --git a/CHANGELOG.md b/CHANGELOG.md index deee21e1..0d2ebeb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Version 0.8.0 - 2026-03-03 #### New Features +- Add raster-based dasymetric mapping: disaggregate, pycnophylactic interpolation, and validation helper (#930) - Add enhanced terrain generation features (#929) - Add fire module: dNBR, RdNBR, burn severity, fireline intensity, flame length, rate of spread (Rothermel), and KBDI (#927) - Add flood prediction tools (#926) diff --git a/docs/source/reference/dasymetric.rst b/docs/source/reference/dasymetric.rst new file mode 100644 index 00000000..89e61a42 --- /dev/null +++ b/docs/source/reference/dasymetric.rst @@ -0,0 +1,26 @@ +.. _dasymetric: + +*********** +Dasymetric +*********** + +Disaggregate +============ +.. autosummary:: + :toctree: _autosummary + + xrspatial.dasymetric.disaggregate + +Pycnophylactic +============== +.. autosummary:: + :toctree: _autosummary + + xrspatial.dasymetric.pycnophylactic + +Validate +======== +.. autosummary:: + :toctree: _autosummary + + xrspatial.dasymetric.validate_disaggregation diff --git a/docs/source/reference/index.rst b/docs/source/reference/index.rst index 90d32853..9c904400 100644 --- a/docs/source/reference/index.rst +++ b/docs/source/reference/index.rst @@ -8,6 +8,7 @@ Reference :maxdepth: 2 classification + dasymetric fire flood focal diff --git a/examples/user_guide/14_Dasymetric_Mapping.ipynb b/examples/user_guide/14_Dasymetric_Mapping.ipynb new file mode 100644 index 00000000..ac3e512e --- /dev/null +++ b/examples/user_guide/14_Dasymetric_Mapping.ipynb @@ -0,0 +1,216 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dasymetric Mapping\n", + "\n", + "Dasymetric mapping redistributes aggregate zone-level data (like population per census tract) onto a finer raster grid using ancillary weight information (land cover, nighttime lights, etc.).\n", + "\n", + "This is the spatial inverse of `zonal.stats`: instead of aggregating pixel values into zone summaries, we spread zone-level totals back across pixels, weighted by auxiliary data.\n", + "\n", + "`xrspatial.disaggregate` supports three methods:\n", + "- **`'binary'`** -- split value equally among nonzero-weight pixels\n", + "- **`'weighted'`** (default) -- distribute proportionally to weight values\n", + "- **`'limiting_variable'`** -- three-class dasymetric with density caps (numpy-only)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import xarray as xr\n", + "import matplotlib.pyplot as plt\n", + "\n", + "from xrspatial import disaggregate" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Generate Synthetic Data\n", + "\n", + "We create a small grid of census-tract-like zones, assign population values per zone, and build a land-cover-based weight raster." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# 10x10 zone raster with 4 \"census tracts\"\n", + "zones_data = np.ones((10, 10), dtype=np.float64)\n", + "zones_data[:5, :5] = 1\n", + "zones_data[:5, 5:] = 2\n", + "zones_data[5:, :5] = 3\n", + "zones_data[5:, 5:] = 4\n", + "\n", + "zones = xr.DataArray(zones_data, dims=['y', 'x'])\n", + "\n", + "# Population per tract\n", + "population = {1: 1000.0, 2: 500.0, 3: 2000.0, 4: 300.0}\n", + "\n", + "# Ancillary weight raster -- simulates land cover suitability\n", + "np.random.seed(42)\n", + "weight_data = np.random.rand(10, 10).astype(np.float64)\n", + "# Make some areas uninhabitable (zero weight)\n", + "weight_data[0:2, 0:2] = 0.0 # water body in tract 1\n", + "weight_data[7:9, 7:9] = 0.0 # park in tract 4\n", + "\n", + "weight = xr.DataArray(weight_data, dims=['y', 'x'])\n", + "\n", + "fig, axes = plt.subplots(1, 2, figsize=(10, 4))\n", + "zones.plot(ax=axes[0], cmap='Set2')\n", + "axes[0].set_title('Zones (Census Tracts)')\n", + "weight.plot(ax=axes[1], cmap='YlGn')\n", + "axes[1].set_title('Weight (Land Cover)')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Binary Method\n", + "\n", + "The `'binary'` method binarizes the weight raster (nonzero becomes 1, zero stays 0) and splits each zone's value equally among its nonzero pixels." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result_binary = disaggregate(zones, population, weight, method='binary')\n", + "\n", + "fig, ax = plt.subplots(figsize=(6, 5))\n", + "result_binary.plot(ax=ax, cmap='YlOrRd')\n", + "ax.set_title('Binary Dasymetric Result')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Weighted Method\n", + "\n", + "The `'weighted'` method (default) distributes each zone's value proportionally to the weight values:\n", + "\n", + "$$\\text{pixel} = \\text{zone\\_value} \\times \\frac{\\text{pixel\\_weight}}{\\sum_{\\text{zone}} \\text{weights}}$$" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result_weighted = disaggregate(zones, population, weight, method='weighted')\n", + "\n", + "fig, ax = plt.subplots(figsize=(6, 5))\n", + "result_weighted.plot(ax=ax, cmap='YlOrRd')\n", + "ax.set_title('Weighted Dasymetric Result')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Conservation Property\n", + "\n", + "A key property of dasymetric mapping: the sum of output pixel values within each zone should equal the original zone total." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for zid, expected in population.items():\n", + " actual = float(np.nansum(result_weighted.values[zones_data == zid]))\n", + " print(f'Zone {zid}: expected={expected:.1f}, actual={actual:.1f}, '\n", + " f'diff={abs(expected - actual):.2e}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Comparing Methods\n", + "\n", + "The binary method produces uniform density within each zone (among habitable pixels), while the weighted method produces spatially varying density." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fig, axes = plt.subplots(1, 2, figsize=(12, 5))\n", + "\n", + "result_binary.plot(ax=axes[0], cmap='YlOrRd', vmin=0,\n", + " vmax=float(np.nanmax(result_weighted.values)))\n", + "axes[0].set_title('Binary')\n", + "\n", + "result_weighted.plot(ax=axes[1], cmap='YlOrRd', vmin=0,\n", + " vmax=float(np.nanmax(result_weighted.values)))\n", + "axes[1].set_title('Weighted')\n", + "\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Limiting Variable Method\n", + "\n", + "The `'limiting_variable'` method applies per-class density caps and redistributes overflow iteratively. This is useful when you know certain land cover types have maximum population densities. Currently only available for numpy arrays." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result_lv = disaggregate(zones, population, weight,\n", + " method='limiting_variable')\n", + "\n", + "fig, ax = plt.subplots(figsize=(6, 5))\n", + "result_lv.plot(ax=ax, cmap='YlOrRd')\n", + "ax.set_title('Limiting Variable Result')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/xrspatial/__init__.py b/xrspatial/__init__.py index 8f896dc1..adc407a0 100644 --- a/xrspatial/__init__.py +++ b/xrspatial/__init__.py @@ -1,6 +1,9 @@ from xrspatial.aspect import aspect # noqa from xrspatial.bump import bump # noqa from xrspatial.cost_distance import cost_distance # noqa +from xrspatial.dasymetric import disaggregate # noqa +from xrspatial.dasymetric import pycnophylactic # noqa +from xrspatial.dasymetric import validate_disaggregation # noqa from xrspatial.classify import binary # noqa from xrspatial.classify import box_plot # noqa from xrspatial.classify import head_tail_breaks # noqa diff --git a/xrspatial/accessor.py b/xrspatial/accessor.py index 32e8e8ad..1692b398 100644 --- a/xrspatial/accessor.py +++ b/xrspatial/accessor.py @@ -235,6 +235,16 @@ def regions(self, **kwargs): from .zonal import regions return regions(self._obj, **kwargs) + # ---- Dasymetric ---- + + def disaggregate(self, values, weight, **kwargs): + from .dasymetric import disaggregate + return disaggregate(self._obj, values, weight, **kwargs) + + def pycnophylactic(self, values, **kwargs): + from .dasymetric import pycnophylactic + return pycnophylactic(self._obj, values, **kwargs) + # ---- Terrain generation ---- def generate_terrain(self, **kwargs): diff --git a/xrspatial/dasymetric.py b/xrspatial/dasymetric.py new file mode 100644 index 00000000..4445a2b6 --- /dev/null +++ b/xrspatial/dasymetric.py @@ -0,0 +1,704 @@ +"""Raster-based dasymetric mapping. + +Dasymetric mapping redistributes aggregate zone-level data (e.g. population +per census tract) onto a finer raster grid using ancillary weight information +(land cover, nighttime lights, etc.). This is the spatial inverse of +``zonal.stats``. + +Three redistribution methods are supported via :func:`disaggregate`: + +* ``'binary'`` -- binarize weights (nonzero -> 1), split value equally among + nonzero-weight pixels in each zone. +* ``'weighted'`` (default) -- distribute proportionally: + ``pixel = zone_value * pixel_weight / zone_weight_sum``. +* ``'limiting_variable'`` -- three-class dasymetric with per-class density + caps and iterative redistribution (numpy-only). + +Additionally, :func:`pycnophylactic` implements Tobler's smooth +pycnophylactic interpolation, which preserves zone totals through iterative +Laplacian smoothing with mass correction. + +:func:`validate_disaggregation` checks that zone totals are preserved within +tolerance after any redistribution. + +All four backends (numpy, cupy, dask+numpy, dask+cupy) are supported for the +binary and weighted methods of ``disaggregate`` and for +``validate_disaggregation``. ``pycnophylactic`` supports numpy and cupy +(via CPU fallback). +""" + +from __future__ import annotations + +from typing import Union + +import numpy as np +import pandas as pd +import xarray as xr + +try: + import dask + import dask.array as da + from dask import delayed +except ImportError: + dask = None + da = None + delayed = None + +try: + import cupy +except ImportError: + class cupy: + ndarray = False + +from xrspatial.utils import ( + ArrayTypeFunctionMapping, + has_cuda_and_cupy, + has_dask_array, + is_cupy_array, + is_dask_cupy, +) + +VALID_METHODS = ('binary', 'weighted', 'limiting_variable') + + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- + +def _normalize_values(values): + """Convert *values* (dict or pd.Series) to ``dict[int, float]``.""" + if isinstance(values, pd.Series): + return {int(k): float(v) for k, v in values.items()} + if isinstance(values, dict): + return {int(k): float(v) for k, v in values.items()} + raise TypeError( + f"'values' must be a dict or pd.Series, got {type(values).__name__}" + ) + + +# --------------------------------------------------------------------------- +# numpy backend +# --------------------------------------------------------------------------- + +def _disaggregate_numpy(zones, weight, values_dict, method, nodata_zone): + """Core numpy implementation. Returns a float64 ndarray.""" + zones_arr = np.asarray(zones, dtype=np.float64) + weight_arr = np.asarray(weight, dtype=np.float64) + + # clamp negative weights to 0 + weight_arr = np.where(weight_arr < 0, 0.0, weight_arr) + + # binarize for binary method + if method == 'binary': + weight_arr = np.where(weight_arr != 0, 1.0, 0.0) + + # mask: NaN in zones or weight, or nodata_zone + invalid = np.isnan(zones_arr) | np.isnan(weight_arr) + if nodata_zone is not None: + invalid |= (zones_arr == nodata_zone) + + # compute zone weight sums + zone_ids = np.array(sorted(values_dict.keys()), dtype=np.float64) + zone_sums = {} + for zid in zone_ids: + mask = (~invalid) & (zones_arr == zid) + zone_sums[int(zid)] = float(np.nansum(weight_arr[mask])) + + return _disaggregate_numpy_with_sums( + zones_arr, weight_arr, values_dict, zone_sums, invalid, + ) + + +def _disaggregate_numpy_with_sums( + zones_arr, weight_arr, values_dict, zone_sums, invalid, +): + """Distribute using precomputed zone weight sums. + + Used directly by numpy and also called per-chunk by the dask backend. + """ + result = np.full(zones_arr.shape, np.nan, dtype=np.float64) + + for zid, zval in values_dict.items(): + mask = (~invalid) & (zones_arr == zid) + wsum = zone_sums.get(int(zid), 0.0) + if wsum == 0.0: + # zero total weight -> NaN + result[mask] = np.nan + else: + result[mask] = zval * weight_arr[mask] / wsum + + return result + + +# --------------------------------------------------------------------------- +# cupy backend -- CPU fallback +# --------------------------------------------------------------------------- + +def _disaggregate_cupy(zones, weight, values_dict, method, nodata_zone): + """CuPy fallback: transfer to CPU, run numpy, transfer back.""" + zones_np = zones.get() + weight_np = weight.get() + result_np = _disaggregate_numpy(zones_np, weight_np, values_dict, + method, nodata_zone) + return cupy.asarray(result_np) + + +# --------------------------------------------------------------------------- +# dask+numpy backend +# --------------------------------------------------------------------------- + +def _compute_zone_sums_dask(zones_da, weight_da, zone_ids, method, + nodata_zone): + """Eagerly compute per-zone weight sums across all dask chunks. + + Returns a plain ``dict[int, float]``. + """ + zone_ids_set = set(int(z) for z in zone_ids) + + def _chunk_sums(z_block, w_block): + z = np.asarray(z_block, dtype=np.float64) + w = np.asarray(w_block, dtype=np.float64) + w = np.where(w < 0, 0.0, w) + if method == 'binary': + w = np.where(w != 0, 1.0, 0.0) + invalid = np.isnan(z) | np.isnan(w) + if nodata_zone is not None: + invalid |= (z == nodata_zone) + sums = {} + for zid in zone_ids_set: + mask = (~invalid) & (z == zid) + sums[zid] = float(np.nansum(w[mask])) + return sums + + # collect delayed chunk sums + z_blocks = zones_da.to_delayed().ravel() + w_blocks = weight_da.to_delayed().ravel() + chunk_results = dask.compute(*[ + delayed(_chunk_sums)(zb, wb) for zb, wb in zip(z_blocks, w_blocks) + ]) + + # merge across chunks + merged = {int(zid): 0.0 for zid in zone_ids_set} + for cr in chunk_results: + for zid, s in cr.items(): + merged[int(zid)] += s + return merged + + +def _distribute_chunk(z_block, w_block, values_dict, zone_sums, method, + nodata_zone): + """Map-blocks worker: distribute within a single chunk.""" + z = np.asarray(z_block, dtype=np.float64) + w = np.asarray(w_block, dtype=np.float64) + w = np.where(w < 0, 0.0, w) + if method == 'binary': + w = np.where(w != 0, 1.0, 0.0) + invalid = np.isnan(z) | np.isnan(w) + if nodata_zone is not None: + invalid |= (z == nodata_zone) + return _disaggregate_numpy_with_sums(z, w, values_dict, zone_sums, + invalid) + + +def _disaggregate_dask_numpy(zones_da, weight_da, values_dict, method, + nodata_zone): + """Dask+numpy: eagerly compute zone sums, lazily distribute.""" + zone_ids = list(values_dict.keys()) + zone_sums = _compute_zone_sums_dask(zones_da, weight_da, zone_ids, + method, nodata_zone) + + result = da.map_blocks( + _distribute_chunk, + zones_da, + weight_da, + values_dict=values_dict, + zone_sums=zone_sums, + method=method, + nodata_zone=nodata_zone, + dtype=np.float64, + ) + return result + + +# --------------------------------------------------------------------------- +# dask+cupy backend +# --------------------------------------------------------------------------- + +def _cupy_to_numpy_block(block): + """Convert a cupy array block to numpy.""" + if hasattr(block, 'get'): + return block.get() + return np.asarray(block) + + +def _disaggregate_dask_cupy(zones_da, weight_da, values_dict, method, + nodata_zone): + """Dask+cupy: convert chunks to numpy, delegate to dask+numpy path.""" + zones_np = da.map_blocks(_cupy_to_numpy_block, zones_da, + dtype=zones_da.dtype, + meta=np.array((), dtype=zones_da.dtype)) + weight_np = da.map_blocks(_cupy_to_numpy_block, weight_da, + dtype=weight_da.dtype, + meta=np.array((), dtype=weight_da.dtype)) + return _disaggregate_dask_numpy(zones_np, weight_np, values_dict, + method, nodata_zone) + + +# --------------------------------------------------------------------------- +# limiting variable (numpy-only) +# --------------------------------------------------------------------------- + +def _disaggregate_limiting_numpy(zones, weight, values_dict, nodata_zone, + class_breaks=(0.0,), density_caps=None): + """Three-class dasymetric with density caps and iterative redistribution. + + Parameters + ---------- + zones, weight : numpy arrays + values_dict : dict mapping zone_id -> value + nodata_zone : int or None + class_breaks : tuple of floats + Thresholds that split weight values into classes. For example + ``(0.0,)`` creates two classes: zero-weight and nonzero-weight. + density_caps : sequence of floats or None + Maximum density (value per pixel) for each class. If None, + no capping is applied and the result equals the weighted method. + """ + zones_arr = np.asarray(zones, dtype=np.float64) + weight_arr = np.asarray(weight, dtype=np.float64) + weight_arr = np.where(weight_arr < 0, 0.0, weight_arr) + + invalid = np.isnan(zones_arr) | np.isnan(weight_arr) + if nodata_zone is not None: + invalid |= (zones_arr == nodata_zone) + + # classify pixels + breaks = sorted(class_breaks) + n_classes = len(breaks) + 1 + pixel_class = np.zeros(zones_arr.shape, dtype=np.int32) + for i, b in enumerate(breaks): + pixel_class[weight_arr > b] = i + 1 + + if density_caps is None: + density_caps = [np.inf] * n_classes + + result = np.full(zones_arr.shape, np.nan, dtype=np.float64) + + for zid, zval in values_dict.items(): + zmask = (~invalid) & (zones_arr == zid) + remaining = zval + + for cls in range(n_classes): + cls_mask = zmask & (pixel_class == cls) + n_pixels = int(cls_mask.sum()) + if n_pixels == 0: + continue + cap = density_caps[cls] + allocated = min(remaining, n_pixels * cap) + if n_pixels > 0: + result[cls_mask] = allocated / n_pixels + remaining -= allocated + if remaining <= 0: + break + + # distribute any leftover evenly across all zone pixels + if remaining > 0: + n_total = int(zmask.sum()) + if n_total > 0: + result[zmask] += remaining / n_total + + return result + + +# --------------------------------------------------------------------------- +# public API +# --------------------------------------------------------------------------- + +def disaggregate( + zones: xr.DataArray, + values: Union[dict, pd.Series], + weight: xr.DataArray, + method: str = 'weighted', + nodata_zone: Union[int, None] = None, + name: str = 'disaggregate', +) -> xr.DataArray: + """Redistribute zone-level values onto a raster using ancillary weights. + + This is the spatial inverse of :func:`~xrspatial.zonal.stats`: given + aggregate values per zone (e.g. population per census tract) and a + weight surface (e.g. land cover), ``disaggregate`` distributes each + zone's total across its pixels proportionally to their weights. + + Parameters + ---------- + zones : xr.DataArray + 2-D integer raster identifying zone membership for each pixel. + values : dict or pd.Series + Mapping of ``zone_id -> value`` to redistribute. + weight : xr.DataArray + 2-D float raster of ancillary weights (e.g. land-cover suitability). + Negative weights are clamped to zero. + method : str, default ``'weighted'`` + Redistribution method: + + * ``'binary'`` -- binarize weights (nonzero -> 1), split equally. + * ``'weighted'`` -- proportional: ``pixel = zone_value * + pixel_weight / zone_weight_sum``. + * ``'limiting_variable'`` -- three-class dasymetric with per-class + density caps (numpy-only). + nodata_zone : int or None, default None + Zone ID to treat as no-data (output NaN). + name : str, default ``'disaggregate'`` + Name for the output DataArray. + + Returns + ------- + xr.DataArray + Float raster with the same shape, coordinates, and attributes as + *zones*. The sum of output pixels within each zone equals the + input zone value (conservation property), except where all weights + are zero (result is NaN). + + Examples + -------- + >>> import numpy as np, xarray as xr + >>> from xrspatial import disaggregate + >>> zones = xr.DataArray(np.array([[1, 1], [2, 2]]), dims=['y', 'x']) + >>> weight = xr.DataArray(np.array([[1.0, 3.0], [2.0, 2.0]]), dims=['y', 'x']) + >>> result = disaggregate(zones, {1: 100, 2: 50}, weight) + >>> result.values + array([[25., 75.], + [25., 25.]]) + """ + # --- validation --- + if not isinstance(zones, xr.DataArray): + raise TypeError( + f"'zones' must be an xr.DataArray, got {type(zones).__name__}" + ) + if not isinstance(weight, xr.DataArray): + raise TypeError( + f"'weight' must be an xr.DataArray, got {type(weight).__name__}" + ) + if zones.ndim != 2: + raise ValueError( + f"'zones' must be 2-D, got {zones.ndim}-D" + ) + if weight.ndim != 2: + raise ValueError( + f"'weight' must be 2-D, got {weight.ndim}-D" + ) + if zones.shape != weight.shape: + raise ValueError( + f"'zones' and 'weight' must have the same shape, " + f"got {zones.shape} and {weight.shape}" + ) + + method = method.lower() + if method not in VALID_METHODS: + raise ValueError( + f"Invalid method {method!r}. Must be one of {VALID_METHODS}" + ) + + values_dict = _normalize_values(values) + + # --- limiting_variable: numpy-only --- + if method == 'limiting_variable': + _data = zones.data + if has_dask_array() and isinstance(_data, da.Array): + raise NotImplementedError( + "method='limiting_variable' is not supported for dask arrays" + ) + if has_cuda_and_cupy() and is_cupy_array(_data): + raise NotImplementedError( + "method='limiting_variable' is not supported for cupy arrays" + ) + result_data = _disaggregate_limiting_numpy( + zones.data, weight.data, values_dict, nodata_zone, + ) + return xr.DataArray( + result_data, dims=zones.dims, coords=zones.coords, + attrs=zones.attrs, name=name, + ) + + # --- dispatch by backend --- + mapper = ArrayTypeFunctionMapping( + numpy_func=_disaggregate_numpy, + cupy_func=_disaggregate_cupy, + dask_func=_disaggregate_dask_numpy, + dask_cupy_func=_disaggregate_dask_cupy, + ) + func = mapper(zones) + result_data = func(zones.data, weight.data, values_dict, method, + nodata_zone) + + return xr.DataArray( + result_data, dims=zones.dims, coords=zones.coords, + attrs=zones.attrs, name=name, + ) + + +# --------------------------------------------------------------------------- +# pycnophylactic interpolation +# --------------------------------------------------------------------------- + +def _pycnophylactic_numpy(zones_arr, values_dict, nodata_zone, + max_iterations, convergence): + """Tobler's pycnophylactic interpolation (numpy). + + Algorithm + --------- + 1. Initialise: each pixel gets ``zone_value / zone_pixel_count``. + 2. Smooth: replace each pixel with the mean of its 4-connected neighbours + (edge pixels use available neighbours only). + 3. Correct: scale every pixel in each zone so the zone sum matches the + original value exactly. + 4. Repeat 2-3 until the maximum per-pixel change is below *convergence*. + """ + zones = np.asarray(zones_arr, dtype=np.float64) + nrows, ncols = zones.shape + + # identify valid pixels per zone and build initial surface + invalid = np.isnan(zones) + if nodata_zone is not None: + invalid |= (zones == nodata_zone) + + surface = np.full(zones.shape, np.nan, dtype=np.float64) + zone_masks = {} + zone_counts = {} + zone_vals = {} + + for zid, zval in values_dict.items(): + mask = (~invalid) & (zones == zid) + n = int(mask.sum()) + if n == 0: + continue + zone_masks[zid] = mask + zone_counts[zid] = n + zone_vals[zid] = zval + surface[mask] = zval / n + + # pixels that belong to some zone (valid for smoothing) + valid = ~np.isnan(surface) + + for _ in range(max_iterations): + # Laplacian smoothing: mean of 4-connected neighbours + smoothed = np.full_like(surface, np.nan) + neighbour_sum = np.zeros_like(surface) + neighbour_count = np.zeros_like(surface) + + # shift in each direction, accumulate + if nrows > 1: + neighbour_sum[1:, :] += np.where(valid[:-1, :], surface[:-1, :], 0) + neighbour_count[1:, :] += valid[:-1, :].astype(np.float64) + neighbour_sum[:-1, :] += np.where(valid[1:, :], surface[1:, :], 0) + neighbour_count[:-1, :] += valid[1:, :].astype(np.float64) + if ncols > 1: + neighbour_sum[:, 1:] += np.where(valid[:, :-1], surface[:, :-1], 0) + neighbour_count[:, 1:] += valid[:, :-1].astype(np.float64) + neighbour_sum[:, :-1] += np.where(valid[:, 1:], surface[:, 1:], 0) + neighbour_count[:, :-1] += valid[:, 1:].astype(np.float64) + + has_neighbours = neighbour_count > 0 + smoothed[valid & has_neighbours] = ( + neighbour_sum[valid & has_neighbours] + / neighbour_count[valid & has_neighbours] + ) + # pixels with no valid neighbours keep their current value + smoothed[valid & ~has_neighbours] = surface[valid & ~has_neighbours] + + # mass correction: rescale each zone to preserve total + for zid in zone_masks: + mask = zone_masks[zid] + current_sum = np.nansum(smoothed[mask]) + if current_sum != 0: + smoothed[mask] *= zone_vals[zid] / current_sum + else: + # all smoothed to zero -- revert to uniform + smoothed[mask] = zone_vals[zid] / zone_counts[zid] + + # convergence check + max_change = np.nanmax(np.abs(smoothed[valid] - surface[valid])) + surface = smoothed + if max_change < convergence: + break + + return surface + + +def pycnophylactic( + zones: xr.DataArray, + values: Union[dict, pd.Series], + max_iterations: int = 100, + convergence: float = 1e-5, + nodata_zone: Union[int, None] = None, + name: str = 'pycnophylactic', +) -> xr.DataArray: + """Tobler's pycnophylactic interpolation preserving zone totals. + + Produces a smooth surface by iteratively applying Laplacian smoothing + and then rescaling each zone so its pixel sum matches the original + value. Unlike :func:`disaggregate`, no ancillary weight raster is + needed -- smoothness alone drives the redistribution. + + Parameters + ---------- + zones : xr.DataArray + 2-D integer raster identifying zone membership for each pixel. + values : dict or pd.Series + Mapping of ``zone_id -> value`` to redistribute. + max_iterations : int, default 100 + Maximum number of smoothing-correction iterations. + convergence : float, default 1e-5 + Stop when the largest per-pixel change is below this threshold. + nodata_zone : int or None, default None + Zone ID to treat as no-data (output NaN). + name : str, default ``'pycnophylactic'`` + Name for the output DataArray. + + Returns + ------- + xr.DataArray + Float raster with the same shape, coordinates, and attributes as + *zones*. Zone totals are preserved (pycnophylactic property). + + Notes + ----- + Currently supports numpy and cupy (via CPU fallback) backends. + Dask arrays raise ``NotImplementedError`` because the algorithm is + inherently iterative and requires global zone sums each iteration. + + References + ---------- + Tobler, W. R. (1979). "Smooth Pycnophylactic Interpolation for + Geographical Regions". *Journal of the American Statistical + Association*, 74(367), 519-530. + + Examples + -------- + >>> import numpy as np, xarray as xr + >>> from xrspatial.dasymetric import pycnophylactic + >>> zones = xr.DataArray(np.array([[1, 1], [2, 2]]), dims=['y', 'x']) + >>> result = pycnophylactic(zones, {1: 100, 2: 50}) + >>> float(np.nansum(result.values[:1, :])) # zone 1 sum + 100.0 + """ + # --- validation --- + if not isinstance(zones, xr.DataArray): + raise TypeError( + f"'zones' must be an xr.DataArray, got {type(zones).__name__}" + ) + if zones.ndim != 2: + raise ValueError( + f"'zones' must be 2-D, got {zones.ndim}-D" + ) + + values_dict = _normalize_values(values) + + _data = zones.data + + # dask not supported (iterative algorithm needs global state each step) + if has_dask_array() and isinstance(_data, da.Array): + raise NotImplementedError( + "pycnophylactic is not supported for dask arrays. " + "Compute zones first with zones.compute()." + ) + + # cupy: CPU fallback + if has_cuda_and_cupy() and is_cupy_array(_data): + zones_np = _data.get() + result_data = _pycnophylactic_numpy( + zones_np, values_dict, nodata_zone, max_iterations, convergence, + ) + result_data = cupy.asarray(result_data) + else: + result_data = _pycnophylactic_numpy( + _data, values_dict, nodata_zone, max_iterations, convergence, + ) + + return xr.DataArray( + result_data, dims=zones.dims, coords=zones.coords, + attrs=zones.attrs, name=name, + ) + + +# --------------------------------------------------------------------------- +# validation helper +# --------------------------------------------------------------------------- + +def validate_disaggregation( + result: xr.DataArray, + zones: xr.DataArray, + values: Union[dict, pd.Series], + tolerance: float = 1e-6, +) -> bool: + """Check that zone totals in *result* match *values* within tolerance. + + Parameters + ---------- + result : xr.DataArray + Output of :func:`disaggregate` or :func:`pycnophylactic`. + zones : xr.DataArray + The zone raster used to produce *result*. + values : dict or pd.Series + The original ``zone_id -> value`` mapping. + tolerance : float, default 1e-6 + Maximum allowed relative error per zone. Zones whose total + weight is zero (result all NaN) are skipped. + + Returns + ------- + bool + ``True`` if all zone totals are within tolerance. + + Raises + ------ + ValueError + If any zone total exceeds the tolerance, with details on + which zones failed. + + Examples + -------- + >>> from xrspatial.dasymetric import disaggregate, validate_disaggregation + >>> result = disaggregate(zones, values, weight) + >>> validate_disaggregation(result, zones, values) + True + """ + values_dict = _normalize_values(values) + + # extract numpy arrays from any backend + result_np = _to_numpy(result.data) + zones_np = _to_numpy(zones.data) + + failures = [] + for zid, expected in values_dict.items(): + mask = zones_np == zid + if not np.any(mask): + continue + actual = float(np.nansum(result_np[mask])) + # skip zones that are all NaN (zero-weight zones) + if np.all(np.isnan(result_np[mask])): + continue + if expected == 0: + if abs(actual) > tolerance: + failures.append((zid, expected, actual)) + else: + rel_err = abs(actual - expected) / abs(expected) + if rel_err > tolerance: + failures.append((zid, expected, actual)) + + if failures: + lines = [f" zone {z}: expected={e}, actual={a}" for z, e, a in failures] + raise ValueError( + f"Zone totals not preserved (tolerance={tolerance}):\n" + + "\n".join(lines) + ) + return True + + +def _to_numpy(data): + """Convert array data from any backend to a numpy ndarray.""" + if has_dask_array() and isinstance(data, da.Array): + data = data.compute() + if has_cuda_and_cupy() and is_cupy_array(data): + data = data.get() + return np.asarray(data) diff --git a/xrspatial/tests/test_dasymetric.py b/xrspatial/tests/test_dasymetric.py new file mode 100644 index 00000000..15ab520b --- /dev/null +++ b/xrspatial/tests/test_dasymetric.py @@ -0,0 +1,623 @@ +"""Tests for xrspatial.dasymetric.""" + +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +try: + import dask.array as da +except ImportError: + da = None + +from xrspatial.dasymetric import ( + disaggregate, + pycnophylactic, + validate_disaggregation, +) +from xrspatial.tests.general_checks import ( + create_test_raster, + has_cuda_and_cupy, +) +from xrspatial.utils import has_dask_array + + +# --------------------------------------------------------------------------- +# fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def simple_zones_data(): + """3x4 raster with 3 zones (1, 2, 3).""" + return np.array([ + [1, 1, 2, 2], + [1, 1, 2, 2], + [3, 3, 3, 3], + ], dtype=np.float64) + + +@pytest.fixture +def simple_weight_data(): + """3x4 raster of varying weights.""" + return np.array([ + [1.0, 3.0, 2.0, 2.0], + [2.0, 4.0, 1.0, 1.0], + [0.5, 0.5, 1.0, 2.0], + ], dtype=np.float64) + + +@pytest.fixture +def simple_values(): + return {1: 100.0, 2: 200.0, 3: 40.0} + + +@pytest.fixture +def simple_zones(simple_zones_data): + return create_test_raster(simple_zones_data, backend='numpy', + chunks=(2, 2)) + + +@pytest.fixture +def simple_weight(simple_weight_data): + return create_test_raster(simple_weight_data, backend='numpy', + chunks=(2, 2)) + + +# --------------------------------------------------------------------------- +# TestWeighted +# --------------------------------------------------------------------------- + +class TestWeighted: + def test_known_values(self, simple_zones, simple_weight, simple_values): + result = disaggregate(simple_zones, simple_values, simple_weight, + method='weighted') + data = result.values + + # zone 1 weight sum = 1+3+2+4 = 10 + assert data[0, 0] == pytest.approx(100.0 * 1.0 / 10.0) # 10 + assert data[0, 1] == pytest.approx(100.0 * 3.0 / 10.0) # 30 + assert data[1, 0] == pytest.approx(100.0 * 2.0 / 10.0) # 20 + assert data[1, 1] == pytest.approx(100.0 * 4.0 / 10.0) # 40 + + # zone 2 weight sum = 2+2+1+1 = 6 + assert data[0, 2] == pytest.approx(200.0 * 2.0 / 6.0) + assert data[0, 3] == pytest.approx(200.0 * 2.0 / 6.0) + + # zone 3 weight sum = 0.5+0.5+1+2 = 4 + assert data[2, 0] == pytest.approx(40.0 * 0.5 / 4.0) # 5 + assert data[2, 3] == pytest.approx(40.0 * 2.0 / 4.0) # 20 + + def test_conservation(self, simple_zones, simple_weight, simple_values): + """Sum of result pixels per zone should equal input zone value.""" + result = disaggregate(simple_zones, simple_values, simple_weight) + data = result.values + zones = simple_zones.values + + for zid, zval in simple_values.items(): + total = np.nansum(data[zones == zid]) + assert total == pytest.approx(zval, rel=1e-10) + + def test_uniform_weights(self, simple_zones, simple_values): + """Uniform weights should distribute equally.""" + uniform = create_test_raster( + np.ones((3, 4), dtype=np.float64), backend='numpy') + result = disaggregate(simple_zones, simple_values, uniform) + data = result.values + zones = simple_zones.values + + # zone 1 has 4 pixels -> each gets 100/4 = 25 + np.testing.assert_allclose(data[zones == 1], 25.0) + # zone 2 has 4 pixels -> each gets 200/4 = 50 + np.testing.assert_allclose(data[zones == 2], 50.0) + # zone 3 has 4 pixels -> each gets 40/4 = 10 + np.testing.assert_allclose(data[zones == 3], 10.0) + + def test_output_metadata(self, simple_zones, simple_weight, simple_values): + """Output should preserve dims, coords, and attrs from zones.""" + result = disaggregate(simple_zones, simple_values, simple_weight, + name='pop') + assert result.dims == simple_zones.dims + assert result.name == 'pop' + assert result.shape == simple_zones.shape + + +# --------------------------------------------------------------------------- +# TestBinary +# --------------------------------------------------------------------------- + +class TestBinary: + def test_known_values(self, simple_zones_data, simple_values): + """Binary should split equally among nonzero-weight pixels.""" + # weight with a zero + weight_data = np.array([ + [1.0, 0.0, 5.0, 3.0], + [2.0, 7.0, 0.0, 1.0], + [0.0, 1.0, 1.0, 1.0], + ], dtype=np.float64) + zones = create_test_raster(simple_zones_data, backend='numpy') + weight = create_test_raster(weight_data, backend='numpy') + + result = disaggregate(zones, simple_values, weight, method='binary') + data = result.values + + # zone 1: 3 nonzero pixels (skip [0,1] which is 0) -> 100/3 + z1_mask = (simple_zones_data == 1) & (weight_data != 0) + np.testing.assert_allclose(data[z1_mask], 100.0 / 3.0) + assert data[0, 1] == pytest.approx(0.0) # zero-weight pixel + + # zone 2: 3 nonzero pixels -> 200/3 + z2_mask = (simple_zones_data == 2) & (weight_data != 0) + np.testing.assert_allclose(data[z2_mask], 200.0 / 3.0) + + # zone 3: 3 nonzero pixels -> 40/3 + z3_mask = (simple_zones_data == 3) & (weight_data != 0) + np.testing.assert_allclose(data[z3_mask], 40.0 / 3.0) + + def test_all_nonzero(self, simple_zones, simple_weight, simple_values): + """All weights nonzero: binary = equal split.""" + result = disaggregate(simple_zones, simple_values, simple_weight, + method='binary') + data = result.values + zones = simple_zones.values + + # zone 1: 4 pixels -> 100/4 = 25 + np.testing.assert_allclose(data[zones == 1], 25.0) + + def test_all_zero_zone(self, simple_zones_data, simple_values): + """Zone where all weights are zero should produce NaN.""" + weight_data = np.array([ + [1.0, 1.0, 1.0, 1.0], + [1.0, 1.0, 1.0, 1.0], + [0.0, 0.0, 0.0, 0.0], # zone 3 all zero + ], dtype=np.float64) + zones = create_test_raster(simple_zones_data, backend='numpy') + weight = create_test_raster(weight_data, backend='numpy') + + result = disaggregate(zones, simple_values, weight, method='binary') + data = result.values + # zone 3 should be all NaN + assert np.all(np.isnan(data[2, :])) + + +# --------------------------------------------------------------------------- +# TestLimitingVariable +# --------------------------------------------------------------------------- + +class TestLimitingVariable: + def test_basic_numpy(self, simple_zones, simple_weight, simple_values): + """Limiting variable should run on numpy without error.""" + result = disaggregate(simple_zones, simple_values, simple_weight, + method='limiting_variable') + data = result.values + zones = simple_zones.values + + # conservation should hold + for zid, zval in simple_values.items(): + total = np.nansum(data[zones == zid]) + assert total == pytest.approx(zval, rel=1e-10) + + @pytest.mark.skipif(not has_dask_array(), reason="dask not available") + def test_raises_for_dask(self, simple_zones_data, simple_weight_data, + simple_values): + zones = create_test_raster(simple_zones_data, backend='dask+numpy', + chunks=(2, 2)) + weight = create_test_raster(simple_weight_data, backend='dask+numpy', + chunks=(2, 2)) + with pytest.raises(NotImplementedError): + disaggregate(zones, simple_values, weight, + method='limiting_variable') + + @pytest.mark.skipif(not has_cuda_and_cupy(), + reason="CUDA/CuPy not available") + def test_raises_for_cupy(self, simple_zones_data, simple_weight_data, + simple_values): + zones = create_test_raster(simple_zones_data, backend='cupy') + weight = create_test_raster(simple_weight_data, backend='cupy') + with pytest.raises(NotImplementedError): + disaggregate(zones, simple_values, weight, + method='limiting_variable') + + +# --------------------------------------------------------------------------- +# TestEdgeCases +# --------------------------------------------------------------------------- + +class TestEdgeCases: + def test_missing_zone_id(self, simple_zones, simple_weight): + """Zone present in raster but absent from values -> NaN.""" + values = {1: 100.0} # zones 2 and 3 not in values + result = disaggregate(simple_zones, values, simple_weight) + data = result.values + zones = simple_zones.values + # zone 1 should have values + assert not np.any(np.isnan(data[zones == 1])) + # zones 2, 3 should be NaN + assert np.all(np.isnan(data[zones == 2])) + assert np.all(np.isnan(data[zones == 3])) + + def test_nan_zones(self, simple_weight_data, simple_values): + """NaN in zones raster -> NaN output.""" + zones_data = np.array([ + [1, 1, 2, 2], + [1, np.nan, 2, 2], + [3, 3, 3, 3], + ], dtype=np.float64) + zones = create_test_raster(zones_data, backend='numpy') + weight = create_test_raster(simple_weight_data, backend='numpy') + result = disaggregate(zones, simple_values, weight) + assert np.isnan(result.values[1, 1]) + + def test_nan_weights(self, simple_zones_data, simple_values): + """NaN in weight raster -> NaN for that pixel.""" + weight_data = np.array([ + [1.0, np.nan, 2.0, 2.0], + [2.0, 4.0, 1.0, 1.0], + [0.5, 0.5, 1.0, 2.0], + ], dtype=np.float64) + zones = create_test_raster(simple_zones_data, backend='numpy') + weight = create_test_raster(weight_data, backend='numpy') + result = disaggregate(zones, simple_values, weight) + assert np.isnan(result.values[0, 1]) + + def test_zero_sum_zone(self, simple_zones_data, simple_values): + """Zone with all-zero weights -> NaN.""" + weight_data = np.array([ + [0.0, 0.0, 1.0, 1.0], + [0.0, 0.0, 1.0, 1.0], + [1.0, 1.0, 1.0, 1.0], + ], dtype=np.float64) + zones = create_test_raster(simple_zones_data, backend='numpy') + weight = create_test_raster(weight_data, backend='numpy') + result = disaggregate(zones, simple_values, weight) + data = result.values + # zone 1 has all-zero weights -> NaN + assert np.all(np.isnan(data[simple_zones_data == 1])) + + def test_negative_weights(self, simple_zones, simple_values): + """Negative weights should be clamped to 0.""" + weight_data = np.array([ + [-1.0, 3.0, 2.0, 2.0], + [2.0, 4.0, 1.0, 1.0], + [0.5, 0.5, 1.0, 2.0], + ], dtype=np.float64) + weight = create_test_raster(weight_data, backend='numpy') + result = disaggregate(simple_zones, simple_values, weight) + data = result.values + # pixel [0,0] had negative weight -> clamped to 0 -> gets 0 value + assert data[0, 0] == pytest.approx(0.0) + + def test_nodata_zone(self, simple_zones_data, simple_weight_data, + simple_values): + """Pixels matching nodata_zone should be NaN.""" + zones = create_test_raster(simple_zones_data, backend='numpy') + weight = create_test_raster(simple_weight_data, backend='numpy') + result = disaggregate(zones, simple_values, weight, nodata_zone=3) + data = result.values + # zone 3 should be NaN + assert np.all(np.isnan(data[simple_zones_data == 3])) + # zones 1 and 2 should still have values + assert not np.any(np.isnan(data[simple_zones_data == 1])) + + def test_single_pixel_zone(self): + """Single-pixel zone should get the full zone value.""" + zones_data = np.array([[1, 2]], dtype=np.float64) + weight_data = np.array([[5.0, 3.0]], dtype=np.float64) + zones = create_test_raster(zones_data, backend='numpy') + weight = create_test_raster(weight_data, backend='numpy') + result = disaggregate(zones, {1: 100.0, 2: 200.0}, weight) + assert result.values[0, 0] == pytest.approx(100.0) + assert result.values[0, 1] == pytest.approx(200.0) + + def test_series_input(self, simple_zones, simple_weight): + """pd.Series input should work like dict.""" + values_dict = {1: 100.0, 2: 200.0, 3: 40.0} + values_series = pd.Series(values_dict) + result_dict = disaggregate(simple_zones, values_dict, simple_weight) + result_series = disaggregate(simple_zones, values_series, simple_weight) + np.testing.assert_allclose(result_dict.values, result_series.values, + equal_nan=True) + + def test_extra_key_silently_ignored(self, simple_zones, simple_weight): + """Value keys absent from zones raster should be silently ignored.""" + values = {1: 100.0, 2: 200.0, 3: 40.0, 999: 5000.0} + result = disaggregate(simple_zones, values, simple_weight) + # should not raise; zone 999 doesn't exist, just ignore + assert result.shape == simple_zones.shape + + +# --------------------------------------------------------------------------- +# TestValidation +# --------------------------------------------------------------------------- + +class TestValidation: + def test_zones_not_dataarray(self, simple_weight, simple_values): + with pytest.raises(TypeError, match="zones"): + disaggregate(np.ones((3, 4)), simple_values, simple_weight) + + def test_weight_not_dataarray(self, simple_zones, simple_values): + with pytest.raises(TypeError, match="weight"): + disaggregate(simple_zones, simple_values, np.ones((3, 4))) + + def test_zones_wrong_ndim(self, simple_weight, simple_values): + zones_3d = xr.DataArray(np.ones((2, 3, 4)), dims=['z', 'y', 'x']) + with pytest.raises(ValueError, match="2-D"): + disaggregate(zones_3d, simple_values, simple_weight) + + def test_weight_wrong_ndim(self, simple_zones, simple_values): + weight_1d = xr.DataArray(np.ones(12), dims=['x']) + with pytest.raises(ValueError, match="2-D"): + disaggregate(simple_zones, simple_values, weight_1d) + + def test_shape_mismatch(self, simple_zones, simple_values): + weight_wrong = xr.DataArray(np.ones((5, 5)), dims=['y', 'x']) + with pytest.raises(ValueError, match="same shape"): + disaggregate(simple_zones, simple_values, weight_wrong) + + def test_invalid_method(self, simple_zones, simple_weight, simple_values): + with pytest.raises(ValueError, match="Invalid method"): + disaggregate(simple_zones, simple_values, simple_weight, + method='invalid') + + def test_values_wrong_type(self, simple_zones, simple_weight): + with pytest.raises(TypeError, match="dict or pd.Series"): + disaggregate(simple_zones, [100, 200], simple_weight) + + +# --------------------------------------------------------------------------- +# TestCrossBackend +# --------------------------------------------------------------------------- + +class TestCrossBackend: + @pytest.mark.skipif(not has_dask_array(), reason="dask not available") + @pytest.mark.parametrize("method", ['weighted', 'binary']) + def test_numpy_equals_dask_numpy(self, simple_zones_data, + simple_weight_data, simple_values, + method): + zones_np = create_test_raster(simple_zones_data, backend='numpy') + weight_np = create_test_raster(simple_weight_data, backend='numpy') + zones_dk = create_test_raster(simple_zones_data, backend='dask+numpy', + chunks=(2, 2)) + weight_dk = create_test_raster(simple_weight_data, + backend='dask+numpy', chunks=(2, 2)) + + result_np = disaggregate(zones_np, simple_values, weight_np, + method=method) + result_dk = disaggregate(zones_dk, simple_values, weight_dk, + method=method) + + assert isinstance(result_dk.data, da.Array) + np.testing.assert_allclose(result_np.values, result_dk.values, + equal_nan=True) + + @pytest.mark.skipif(not has_dask_array(), reason="dask not available") + def test_dask_conservation(self, simple_zones_data, simple_weight_data, + simple_values): + zones_dk = create_test_raster(simple_zones_data, backend='dask+numpy', + chunks=(2, 2)) + weight_dk = create_test_raster(simple_weight_data, + backend='dask+numpy', chunks=(2, 2)) + result = disaggregate(zones_dk, simple_values, weight_dk) + data = result.values + zones = simple_zones_data + + for zid, zval in simple_values.items(): + total = np.nansum(data[zones == zid]) + assert total == pytest.approx(zval, rel=1e-10) + + @pytest.mark.skipif(not has_cuda_and_cupy(), + reason="CUDA/CuPy not available") + @pytest.mark.parametrize("method", ['weighted', 'binary']) + def test_numpy_equals_cupy(self, simple_zones_data, simple_weight_data, + simple_values, method): + zones_np = create_test_raster(simple_zones_data, backend='numpy') + weight_np = create_test_raster(simple_weight_data, backend='numpy') + zones_cu = create_test_raster(simple_zones_data, backend='cupy') + weight_cu = create_test_raster(simple_weight_data, backend='cupy') + + result_np = disaggregate(zones_np, simple_values, weight_np, + method=method) + result_cu = disaggregate(zones_cu, simple_values, weight_cu, + method=method) + + np.testing.assert_allclose(result_np.values, + result_cu.data.get(), + equal_nan=True) + + @pytest.mark.skipif( + not (has_cuda_and_cupy() and has_dask_array()), + reason="CUDA/CuPy and dask not available") + @pytest.mark.parametrize("method", ['weighted', 'binary']) + def test_numpy_equals_dask_cupy(self, simple_zones_data, + simple_weight_data, simple_values, + method): + zones_np = create_test_raster(simple_zones_data, backend='numpy') + weight_np = create_test_raster(simple_weight_data, backend='numpy') + zones_dc = create_test_raster(simple_zones_data, + backend='dask+cupy', chunks=(2, 2)) + weight_dc = create_test_raster(simple_weight_data, + backend='dask+cupy', chunks=(2, 2)) + + result_np = disaggregate(zones_np, simple_values, weight_np, + method=method) + result_dc = disaggregate(zones_dc, simple_values, weight_dc, + method=method) + + np.testing.assert_allclose(result_np.values, + result_dc.values, + equal_nan=True) + + +# --------------------------------------------------------------------------- +# TestPycnophylactic +# --------------------------------------------------------------------------- + +class TestPycnophylactic: + def test_conservation(self, simple_zones, simple_values): + """Zone totals must be preserved.""" + result = pycnophylactic(simple_zones, simple_values) + data = result.values + zones = simple_zones.values + + for zid, zval in simple_values.items(): + total = np.nansum(data[zones == zid]) + assert total == pytest.approx(zval, rel=1e-10) + + def test_output_shape_and_metadata(self, simple_zones, simple_values): + result = pycnophylactic(simple_zones, simple_values, name='smooth') + assert result.shape == simple_zones.shape + assert result.dims == simple_zones.dims + assert result.name == 'smooth' + + def test_single_zone_uniform(self): + """A single zone should converge to a uniform surface.""" + zones_data = np.ones((5, 5), dtype=np.float64) + zones = create_test_raster(zones_data, backend='numpy') + result = pycnophylactic(zones, {1: 100.0}, convergence=1e-10) + # all pixels should be 100/25 = 4.0 + np.testing.assert_allclose(result.values, 4.0, atol=1e-8) + + def test_nodata_zone(self, simple_zones, simple_values): + """Nodata zone pixels should be NaN.""" + result = pycnophylactic(simple_zones, simple_values, nodata_zone=3) + data = result.values + zones = simple_zones.values + assert np.all(np.isnan(data[zones == 3])) + # other zones still conserve + for zid in [1, 2]: + total = np.nansum(data[zones == zid]) + assert total == pytest.approx(simple_values[zid], rel=1e-10) + + def test_smoothness(self): + """Result should be smoother than uniform initial -- check that + adjacent pixels within a zone don't have wild jumps.""" + zones_data = np.array([ + [1, 1, 1, 1, 2, 2, 2, 2], + [1, 1, 1, 1, 2, 2, 2, 2], + [1, 1, 1, 1, 2, 2, 2, 2], + [1, 1, 1, 1, 2, 2, 2, 2], + ], dtype=np.float64) + zones = create_test_raster(zones_data, backend='numpy') + result = pycnophylactic(zones, {1: 160.0, 2: 320.0}, + max_iterations=200, convergence=1e-8) + data = result.values + # pixel-to-pixel differences should be small + dy = np.abs(np.diff(data, axis=0)) + dx = np.abs(np.diff(data, axis=1)) + # max gradient should be modest relative to the mean value + mean_val = np.nanmean(data) + assert np.nanmax(dy) < mean_val + assert np.nanmax(dx) < mean_val + + def test_missing_zone_produces_nan(self, simple_zones): + """Zones absent from values dict should be NaN in output.""" + result = pycnophylactic(simple_zones, {1: 100.0}) + data = result.values + zones = simple_zones.values + assert not np.any(np.isnan(data[zones == 1])) + assert np.all(np.isnan(data[zones == 2])) + assert np.all(np.isnan(data[zones == 3])) + + def test_series_input(self, simple_zones, simple_values): + """pd.Series should work like dict.""" + result_dict = pycnophylactic(simple_zones, simple_values) + result_series = pycnophylactic(simple_zones, pd.Series(simple_values)) + np.testing.assert_allclose(result_dict.values, result_series.values, + equal_nan=True) + + @pytest.mark.skipif(not has_dask_array(), reason="dask not available") + def test_raises_for_dask(self, simple_zones_data, simple_values): + zones = create_test_raster(simple_zones_data, backend='dask+numpy', + chunks=(2, 2)) + with pytest.raises(NotImplementedError, match="dask"): + pycnophylactic(zones, simple_values) + + @pytest.mark.skipif(not has_cuda_and_cupy(), + reason="CUDA/CuPy not available") + def test_cupy_matches_numpy(self, simple_zones_data, simple_values): + zones_np = create_test_raster(simple_zones_data, backend='numpy') + zones_cu = create_test_raster(simple_zones_data, backend='cupy') + result_np = pycnophylactic(zones_np, simple_values) + result_cu = pycnophylactic(zones_cu, simple_values) + np.testing.assert_allclose(result_np.values, result_cu.data.get(), + equal_nan=True) + + def test_validation(self): + """Wrong types and dims should raise.""" + with pytest.raises(TypeError, match="zones"): + pycnophylactic(np.ones((3, 4)), {1: 100.0}) + zones_3d = xr.DataArray(np.ones((2, 3, 4)), dims=['z', 'y', 'x']) + with pytest.raises(ValueError, match="2-D"): + pycnophylactic(zones_3d, {1: 100.0}) + + +# --------------------------------------------------------------------------- +# TestValidateDisaggregation +# --------------------------------------------------------------------------- + +class TestValidateDisaggregation: + def test_valid_result(self, simple_zones, simple_weight, simple_values): + result = disaggregate(simple_zones, simple_values, simple_weight) + assert validate_disaggregation(result, simple_zones, simple_values) + + def test_valid_pycnophylactic(self, simple_zones, simple_values): + result = pycnophylactic(simple_zones, simple_values) + assert validate_disaggregation(result, simple_zones, simple_values) + + def test_invalid_result_raises(self, simple_zones, simple_values): + """Manually corrupted result should fail validation.""" + zones_data = simple_zones.values + bad_data = np.ones_like(zones_data, dtype=np.float64) + bad_result = xr.DataArray(bad_data, dims=simple_zones.dims, + coords=simple_zones.coords) + with pytest.raises(ValueError, match="Zone totals not preserved"): + validate_disaggregation(bad_result, simple_zones, simple_values) + + def test_skips_all_nan_zones(self, simple_zones_data, simple_values): + """Zones that are all NaN in result should be skipped.""" + # make zone 1 all-zero weights -> all NaN in result + weight_data = np.array([ + [0.0, 0.0, 1.0, 1.0], + [0.0, 0.0, 1.0, 1.0], + [1.0, 1.0, 1.0, 1.0], + ], dtype=np.float64) + zones = create_test_raster(simple_zones_data, backend='numpy') + weight = create_test_raster(weight_data, backend='numpy') + result = disaggregate(zones, simple_values, weight) + # zone 1 is all NaN, but validation should still pass + # (only zones 2 and 3 are checked) + assert validate_disaggregation(result, zones, simple_values) + + def test_tight_tolerance(self, simple_zones, simple_weight, simple_values): + """With tolerance=0, even floating-point noise could fail.""" + result = disaggregate(simple_zones, simple_values, simple_weight) + # tolerance=1e-15 should still pass for exact arithmetic + assert validate_disaggregation(result, simple_zones, simple_values, + tolerance=1e-10) + + def test_series_values(self, simple_zones, simple_weight): + """pd.Series values should work.""" + values = pd.Series({1: 100.0, 2: 200.0, 3: 40.0}) + result = disaggregate(simple_zones, values, simple_weight) + assert validate_disaggregation(result, simple_zones, values) + + @pytest.mark.skipif(not has_dask_array(), reason="dask not available") + def test_dask_result(self, simple_zones_data, simple_weight_data, + simple_values): + """Should handle dask-backed result arrays.""" + zones = create_test_raster(simple_zones_data, backend='dask+numpy', + chunks=(2, 2)) + weight = create_test_raster(simple_weight_data, backend='dask+numpy', + chunks=(2, 2)) + result = disaggregate(zones, simple_values, weight) + zones_np = create_test_raster(simple_zones_data, backend='numpy') + assert validate_disaggregation(result, zones_np, simple_values) + + @pytest.mark.skipif(not has_cuda_and_cupy(), + reason="CUDA/CuPy not available") + def test_cupy_result(self, simple_zones_data, simple_weight_data, + simple_values): + """Should handle cupy-backed result arrays.""" + zones = create_test_raster(simple_zones_data, backend='cupy') + weight = create_test_raster(simple_weight_data, backend='cupy') + result = disaggregate(zones, simple_values, weight) + assert validate_disaggregation(result, zones, simple_values)