Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 59 additions & 23 deletions docs/examples/calculate-forest-metrics.ipynb

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions docs/usage/forest-structure/canopy_cover.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Canopy Cover (GEDI-style)

## Definition

Canopy cover at height z is the fraction of ground area occluded by vegetation above z meters (Height Above Ground).
Following GEDI, we model the gap probability using the Beer–Lambert law:

Cover(z) = 1 − exp(−k · PAI_above(z))

Where:
- `k` is the extinction coefficient (default 0.5 for spherical leaf-angle assumption).
- `PAI_above(z)` is the vertical integral of Plant Area Density (PAD) above height `z`.

Common choices:
- Tree canopy cover (GEDI-like): z = 2.0 m.
- Total vegetative cover: z = 0.0 m.

## Usage

```python
from pyforestscan.handlers import read_lidar
from pyforestscan.filters import filter_hag
from pyforestscan.calculate import assign_voxels, calculate_pad, calculate_canopy_cover
from pyforestscan.visualize import plot_metric

file_path = "../example_data/20191210_5QKB020880.laz"

# Ensure HAG is present for PAD/PAI calculations
arrays = read_lidar(file_path, "EPSG:32605", hag=True)
arrays = filter_hag(arrays)
points = arrays[0]

voxel_resolution = (5, 5, 1) # dx, dy, dz in meters
voxels, extent = assign_voxels(points, voxel_resolution)

pad = calculate_pad(voxels, voxel_resolution[-1])

# GEDI-like canopy cover at z=2 m
cover = calculate_canopy_cover(pad, voxel_height=voxel_resolution[-1], min_height=2.0, k=0.5)

plot_metric('Canopy Cover (z=2 m)', cover, extent, metric_name='Cover', cmap='viridis')
```

## Notes

- As z increases, canopy cover decreases because less foliage lies above higher thresholds.
- `calculate_canopy_cover` returns NaN where PAD is entirely missing in the integration range.
- When tiling large EPT datasets, use `process_with_tiles(..., metric="cover", voxel_height=..., cover_min_height=2.0, cover_k=0.5)` to write GeoTIFF tiles.

1 change: 1 addition & 0 deletions docs/usage/forest-structure/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Most calculations for the metrics follows the method outlined in Kamoske et al.
* [Plant Area Density (PAD)](pad.md)
* [Plant Area Index (PAI)](pai.md)
* [Foliage Height Diversity (FHD)](fhd.md)
* [Canopy Cover](canopy_cover.md)

## References

Expand Down
19 changes: 19 additions & 0 deletions pyforestscan/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from .calculate import (
assign_voxels,
calculate_pad,
calculate_pai,
calculate_fhd,
calculate_chm,
generate_dtm,
calculate_canopy_cover,
)

__all__ = [
"assign_voxels",
"calculate_pad",
"calculate_pai",
"calculate_fhd",
"calculate_chm",
"generate_dtm",
"calculate_canopy_cover",
]
60 changes: 59 additions & 1 deletion pyforestscan/calculate.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def calculate_pad(voxel_returns,
if drop_ground:
pad[:, :, 0] = np.nan

pad[voxel_returns[:, :, 0] == 0, :] = np.nan
# Mask only columns that have zero returns across all Z (true empty columns)
empty_columns = (np.sum(voxel_returns, axis=2) == 0)
pad[empty_columns, :] = np.nan

return pad

Expand Down Expand Up @@ -168,6 +170,62 @@ def calculate_pai(pad,
return pai


def calculate_canopy_cover(pad: np.ndarray,
voxel_height: float,
min_height: float = 2.0,
max_height: float | None = None,
k: float = 0.5) -> np.ndarray:
"""
Calculate GEDI-style canopy cover at a height threshold using PAD.

Uses the Beer–Lambert relation: Cover(z) = 1 - exp(-k * PAI_above(z)), where
PAI_above(z) is the integrated Plant Area Index above height z.

Args:
pad (np.ndarray): 3D array of PAD values with shape (X, Y, Z).
voxel_height (float): Height of each voxel in meters (> 0).
min_height (float, optional): Height-above-ground threshold z (in meters) at which
to compute canopy cover. Defaults to 2.0 m (GEDI convention).
max_height (float or None, optional): Maximum height to integrate up to. If None,
integrates to the top of the PAD volume. Defaults to None.
k (float, optional): Extinction coefficient (Beer–Lambert constant). Defaults to 0.5.

Returns:
np.ndarray: 2D array (X, Y) of canopy cover values in [0, 1], with NaN where
PAD is entirely missing for the integration range.

Raises:
ValueError: If parameters are invalid (e.g., non-positive voxel_height, k < 0,
or min_height >= max_height).
"""
if voxel_height <= 0:
raise ValueError(f"voxel_height must be > 0 metres (got {voxel_height})")
if k < 0:
raise ValueError(f"k must be >= 0 (got {k})")

# Compute PAI integrated from min_height up to max_height/top
pai_above = calculate_pai(pad, voxel_height, min_height=min_height, max_height=max_height)

# Identify columns that are entirely NaN within the integration range
if max_height is None:
max_height = pad.shape[2] * voxel_height
if min_height >= max_height:
raise ValueError("Minimum height index must be less than maximum height index.")
start_idx = int(np.ceil(min_height / voxel_height))
end_idx = int(np.floor(max_height / voxel_height))
range_slice = pad[:, :, start_idx:end_idx]
all_nan_mask = np.all(np.isnan(range_slice), axis=2)

# Beer–Lambert canopy cover
cover = 1.0 - np.exp(-k * pai_above)

# Clamp to [0,1] and set invalids
cover = np.where(np.isfinite(cover), cover, np.nan)
cover = np.clip(cover, 0.0, 1.0)
cover[all_nan_mask] = np.nan
return cover


def calculate_fhd(voxel_returns) -> np.ndarray:
"""
Calculate the Foliage Height Diversity (FHD) for a given set of voxel returns.
Expand Down
29 changes: 24 additions & 5 deletions pyforestscan/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from tqdm import tqdm

from pyforestscan.calculate import calculate_fhd, calculate_pad, calculate_pai, assign_voxels, calculate_chm
from pyforestscan.calculate import calculate_fhd, calculate_pad, calculate_pai, assign_voxels, calculate_chm, calculate_canopy_cover
from pyforestscan.filters import remove_outliers_and_clean
from pyforestscan.handlers import create_geotiff
from pyforestscan.pipeline import _hag_raster, _hag_delaunay
Expand Down Expand Up @@ -50,7 +50,8 @@ def _crop_dtm(dtm_path, tile_min_x, tile_min_y, tile_max_x, tile_max_y):

def process_with_tiles(ept_file, tile_size, output_path, metric, voxel_size,
voxel_height=1, buffer_size=0.1, srs=None, hag=False,
hag_dtm=False, dtm=None, bounds=None, interpolation=None, remove_outliers=False) -> None:
hag_dtm=False, dtm=None, bounds=None, interpolation=None, remove_outliers=False,
cover_min_height: float = 2.0, cover_k: float = 0.5) -> None:
"""
Process a large EPT point cloud by tiling, compute CHM or other metrics for each tile,
and write the results to the specified output directory.
Expand All @@ -59,7 +60,7 @@ def process_with_tiles(ept_file, tile_size, output_path, metric, voxel_size,
ept_file (str): Path to the EPT file containing the point cloud data.
tile_size (tuple): Size of each tile as (tile_width, tile_height).
output_path (str): Directory where the output files will be saved.
metric (str): Metric to compute for each tile ("chm", "fhd", or "pai").
metric (str): Metric to compute for each tile ("chm", "fhd", "pai", or "cover").
voxel_size (tuple): Voxel resolution as (x_resolution, y_resolution, z_resolution).
voxel_height (float, optional): Height of each voxel in meters. Required if metric is "pai".
buffer_size (float, optional): Fractional buffer size relative to tile size (e.g., 0.1 for 10% buffer). Defaults to 0.1.
Expand All @@ -72,6 +73,8 @@ def process_with_tiles(ept_file, tile_size, output_path, metric, voxel_size,
If None, tiling is done over the entire dataset.
interpolation (str or None, optional): Interpolation method for CHM calculation ("linear", "cubic", "nearest", or None).
remove_outliers (bool, optional): Whether to remove statistical outliers before calculating metrics. Defaults to False.
cover_min_height (float, optional): Height threshold (in meters) for canopy cover (used when metric == "cover"). Defaults to 2.0.
cover_k (float, optional): Beer–Lambert extinction coefficient for canopy cover. Defaults to 0.5.

Returns:
None
Expand All @@ -80,7 +83,7 @@ def process_with_tiles(ept_file, tile_size, output_path, metric, voxel_size,
ValueError: If an unsupported metric is requested, if buffer or voxel sizes are invalid, or required arguments are missing.
FileNotFoundError: If the EPT or DTM file does not exist, or a required file for processing is missing.
"""
if metric not in ["chm", "fhd", "pai"]:
if metric not in ["chm", "fhd", "pai", "cover"]:
raise ValueError(f"Unsupported metric: {metric}")

(min_z, max_z) = (None, None)
Expand Down Expand Up @@ -188,7 +191,7 @@ def process_with_tiles(ept_file, tile_size, output_path, metric, voxel_size,

result_file = os.path.join(output_path, f"tile_{i}_{j}_chm.tif")
create_geotiff(chm, result_file, srs, core_extent)
elif metric in ["fhd", "pai"]:
elif metric in ["fhd", "pai", "cover"]:
voxels, spatial_extent = assign_voxels(tile_points, voxel_size)

if metric == "fhd":
Expand All @@ -204,6 +207,22 @@ def process_with_tiles(ept_file, tile_size, output_path, metric, voxel_size,
else:
result = calculate_pai(pad, voxel_height)
result = np.where(np.isfinite(result), result, 0)
elif metric == "cover":
if not voxel_height:
raise ValueError(f"voxel_height is required for metric {metric}")

pad = calculate_pad(voxels, voxel_size[-1])
if np.all(pad == 0):
result = np.zeros((pad.shape[0], pad.shape[1]))
else:
result = calculate_canopy_cover(
pad,
voxel_height=voxel_height,
min_height=cover_min_height,
max_height=None,
k=cover_k,
)
result = np.where(np.isfinite(result), result, 0)

if current_buffer_size > 0:
if buffer_pixels_x * 2 >= result.shape[1] or buffer_pixels_y * 2 >= result.shape[0]:
Expand Down
30 changes: 23 additions & 7 deletions pyforestscan/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,41 @@ def get_srs_from_ept(ept_file) -> str or None:

def get_bounds_from_ept(ept_file) -> tuple[float, float, float, float, float, float]:
"""
Extract the spatial bounds of a point cloud from an EPT (Entwine Point Tile) file using PDAL.
Extract dataset bounds from an EPT (Entwine Point Tile) source.

Args:
ept_file (str): Path to the EPT file containing the point cloud data.
ept_file (str): Path or URL to the EPT JSON.

Returns:
tuple: A tuple of bounds in the format (min_x, max_x, min_y, max_y, min_z, max_z).
tuple: (min_x, max_x, min_y, max_y, min_z, max_z)

Raises:
KeyError: If bounds information is not available in the EPT metadata.
ValueError: If the EPT bounds are malformed.
"""
ept_json = _read_ept_json(ept_file)

try:
bounds = ept_json["bounds"]
return bounds
except KeyError:
# Prefer "bounds"; some datasets may also include "boundsConforming".
raw_bounds = ept_json.get("bounds")
if raw_bounds is None:
raw_bounds = ept_json.get("boundsConforming")
if raw_bounds is None:
raise KeyError("Bounds information is not available in the ept metadata.")

# Handle common representations: list/tuple of 6 numbers or a dict with keys.
if isinstance(raw_bounds, (list, tuple)) and len(raw_bounds) == 6:
min_x, min_y, min_z, max_x, max_y, max_z = raw_bounds
elif isinstance(raw_bounds, dict):
try:
min_x = raw_bounds["minx"]; min_y = raw_bounds["miny"]; min_z = raw_bounds["minz"]
max_x = raw_bounds["maxx"]; max_y = raw_bounds["maxy"]; max_z = raw_bounds["maxz"]
except Exception as e:
raise ValueError(f"Malformed EPT bounds dictionary: {e}") from None
else:
raise ValueError(f"Unexpected EPT bounds format: {type(raw_bounds)}")

return (min_x, max_x, min_y, max_y, min_z, max_z)


def tile_las_in_memory(
las_file,
Expand Down
49 changes: 47 additions & 2 deletions tests/test_calculate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
calculate_pad,
calculate_pai,
calculate_fhd,
calculate_chm
calculate_chm,
calculate_canopy_cover
)
import math

Expand Down Expand Up @@ -407,8 +408,13 @@ def test_calculate_fhd_high_diversity():


def test_calculate_fhd_partial_zero_distribution():
voxel_returns = np.random.randint(0, 5, size=(5, 5, 5))
# Construct deterministic columns to avoid flaky equality due to randomness.
voxel_returns = np.zeros((5, 5, 5), dtype=int)
# Column with all mass in a single bin -> entropy 0
voxel_returns[0, 0, :] = [1, 0, 0, 0, 0]
# Column with uniform distribution across bins -> maximal entropy
voxel_returns[1, 1, :] = [1, 1, 1, 1, 1]

fhd = calculate_fhd(voxel_returns)
assert isinstance(fhd, np.ndarray)
assert fhd.shape == (5, 5)
Expand Down Expand Up @@ -601,3 +607,42 @@ def test_calculate_chm_large_heights():
assert isinstance(chm, np.ndarray)
assert chm.ndim == 2
assert np.all(chm >= 1000)


# ----------------------------
# Tests for calculate_canopy_cover
# ----------------------------

def test_calculate_canopy_cover_basic():
pad = np.ones((4, 3, 10), dtype=float)
voxel_height = 1.0
z = 2.0
k = 0.5
# PAI above z=2 with dz=1 and 10 layers => sum from idx 2..9 = 8
expected_cover = 1.0 - np.exp(-k * 8.0)
cov = calculate_canopy_cover(pad, voxel_height, min_height=z, k=k)
assert cov.shape == (4, 3)
assert np.allclose(cov, expected_cover)


def test_calculate_canopy_cover_zero_pad():
pad = np.zeros((2, 2, 5), dtype=float)
cov = calculate_canopy_cover(pad, voxel_height=1.0, min_height=2.0, k=0.5)
assert np.all(cov == 0.0)


def test_calculate_canopy_cover_nans_propagate():
pad = np.ones((2, 2, 6), dtype=float)
pad[:, :, 3:] = np.nan # everything above z=3 m is NaN
cov = calculate_canopy_cover(pad, voxel_height=1.0, min_height=3.0, k=0.5)
# All NaN in integration range -> NaN in cover
assert np.all(np.isnan(cov))


def test_calculate_canopy_cover_monotonic_with_height():
pad = np.ones((3, 3, 10), dtype=float)
cov0 = calculate_canopy_cover(pad, voxel_height=1.0, min_height=0.0, k=0.5)
cov2 = calculate_canopy_cover(pad, voxel_height=1.0, min_height=2.0, k=0.5)
cov5 = calculate_canopy_cover(pad, voxel_height=1.0, min_height=5.0, k=0.5)
assert np.all(cov0 >= cov2)
assert np.all(cov2 >= cov5)
9 changes: 7 additions & 2 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ def test_get_srs_from_usgs_hawaii_ept():
def test_get_bounds_from_usgs_hawaii_ept():
"""
Tests retrieving the EPT bounds from a real EPT JSON.
get_bounds_from_ept returns (min_x, max_x, min_y, max_y, min_z, max_z).
"""
ept_url = "https://s3-us-west-2.amazonaws.com/usgs-lidar-public/HI_Hawaii_Island_2017/ept.json"
bounds = get_bounds_from_ept(ept_url)
assert len(bounds) == 6, "Bounds should be [minX, maxX, minY, maxY, minZ, maxZ]."
assert bounds == [-17405012, 2136822, -5677, -17229390, 2312444, 169945]
assert len(bounds) == 6, "Bounds should be (minX, maxX, minY, maxY, minZ, maxZ)."
expected = (-17405012, -17229390, 2136822, 2312444, -5677, 169945)
assert tuple(bounds) == expected

min_x, max_x, min_y, max_y, min_z, max_z = bounds
assert min_x < max_x and min_y < max_y and min_z < max_z


def test_tile_las_in_memory():
Expand Down
Loading