diff --git a/src/dolphin/io/__init__.py b/src/dolphin/io/__init__.py index 7fef4b444..3f6ffa4fa 100644 --- a/src/dolphin/io/__init__.py +++ b/src/dolphin/io/__init__.py @@ -1,5 +1,6 @@ from ._background import * from ._blocks import * from ._core import * +from ._process import * from ._readers import * from ._writers import * diff --git a/src/dolphin/io/_process.py b/src/dolphin/io/_process.py new file mode 100644 index 000000000..06253c329 --- /dev/null +++ b/src/dolphin/io/_process.py @@ -0,0 +1,59 @@ +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Protocol, Sequence + +from numpy.typing import ArrayLike +from tqdm.auto import tqdm + +from dolphin.utils import DummyProcessPoolExecutor + +from ._blocks import iter_blocks +from ._readers import StackReader +from ._writers import DatasetWriter + +__all__ = ["BlockProcessor", "process_blocks"] + + +class BlockProcessor(Protocol): + """Protocol for a block-wise processing function. + + Reads a block of data from each reader, processes it, and returns the result + as an array-like object. + """ + + def __call__( + self, readers: Sequence[StackReader], rows: slice, cols: slice + ) -> tuple[ArrayLike, slice, slice]: + ... + + +def process_blocks( + readers: Sequence[StackReader], + writer: DatasetWriter, + func: BlockProcessor, + block_shape: tuple[int, int] = (512, 512), + num_threads: int = 5, +): + """Perform block-wise processing over blocks in `readers`, writing to `writer`. + + Used to read and process a stack of rasters in parallel, setting up a queue + of results for the `writer` to save. + + Note that the parallelism happens using a `ThreadPoolExecutor`, so `func` should + be a function which releases the GIL during computation (e.g. using numpy). + """ + shape = readers[0].shape[-2:] + slices = list(iter_blocks(shape, block_shape=block_shape)) + + pbar = tqdm(total=len(slices)) + + # Define the callback to write the result to an output DatasetWrite + def write_callback(fut: Future): + data, rows, cols = fut.result() + writer[..., rows, cols] = data + pbar.update() + + Executor = ThreadPoolExecutor if num_threads > 1 else DummyProcessPoolExecutor + with Executor(num_threads) as exc: + for rows, cols in slices: + future = exc.submit(func, readers=readers, rows=rows, cols=cols) + future.add_done_callback(write_callback) diff --git a/src/dolphin/io/_writers.py b/src/dolphin/io/_writers.py index 78bd4f52a..71c2d3fe6 100644 --- a/src/dolphin/io/_writers.py +++ b/src/dolphin/io/_writers.py @@ -15,6 +15,7 @@ import numpy as np import rasterio +import rasterio.errors from numpy.typing import ArrayLike, DTypeLike from rasterio.windows import Window @@ -304,13 +305,18 @@ def __setitem__(self, key: tuple[Index, ...], value: np.ndarray, /) -> None: elif len(key) == 3: _, rows, cols = _unpack_3d_slices(key) else: - raise ValueError(f"Invalid key: {key!r}") - window = Window.from_slices( - rows, - cols, - height=dataset.height, - width=dataset.width, - ) + raise ValueError( + f"Invalid key for {self.__class__!r}.__setitem__: {key!r}" + ) + try: + window = Window.from_slices( + rows, + cols, + height=dataset.height, + width=dataset.width, + ) + except rasterio.errors.WindowError as e: + raise ValueError(f"Error creating window: {key = }, {value = }") from e return dataset.write(value, self.band, window=window) diff --git a/src/dolphin/similarity.py b/src/dolphin/similarity.py new file mode 100644 index 000000000..fd742f7b5 --- /dev/null +++ b/src/dolphin/similarity.py @@ -0,0 +1,325 @@ +"""Module for computing phase similarity between complex interferogram pixels. + +Uses metric from @[Wang2022AccuratePersistentScatterer] for similarity. +""" + +import logging +from pathlib import Path +from typing import Callable, Literal, Sequence + +import numba +import numpy as np +from numpy.typing import ArrayLike + +from dolphin._types import PathOrStr + +logger = logging.getLogger(__name__) + + +@numba.njit(nogil=True) +def phase_similarity(x1: ArrayLike, x2: ArrayLike): + """Compute the similarity between two complex 1D vectors.""" + n = len(x1) + out = 0.0 + for i in range(n): + out += np.real(x1[i] * np.conj(x2[i])) + return out / n + + +def median_similarity( + ifg_stack: ArrayLike, search_radius: int, mask: ArrayLike | None = None +): + """Compute the median similarity of each pixel and its neighbors. + + Resulting similarity matches Equation (5) of @[Wang2022AccuratePersistentScatterer] + + Parameters + ---------- + ifg_stack : ArrayLike + 3D stack of complex interferograms. + Shape is (n_ifg, rows, cols) + search_radius: int + maximum radius (in pixels) to search for neighbors when comparing each pixel. + mask: ArrayLike (optional) + Array of mask from True/False indicating whether to ignore the pixel + + Returns + ------- + np.ndarray + 2D array (shape (rows, cols)) of the median similarity at each pixel. + + """ + return _create_loop_and_run( + ifg_stack=ifg_stack, + search_radius=search_radius, + mask=mask, + func=np.nanmedian, + ) + + +def max_similarity( + ifg_stack: ArrayLike, search_radius: int, mask: ArrayLike | None = None +): + """Compute the maximum similarity of each pixel and its neighbors. + + Resulting similarity matches Equation (6) of @[Wang2022AccuratePersistentScatterer] + + Parameters + ---------- + ifg_stack : ArrayLike + 3D stack of complex interferograms. + Shape is (n_ifg, rows, cols) + search_radius: int + maximum radius (in pixels) to search for neighbors when comparing each pixel. + mask: ArrayLike (optional) + Array of mask from True/False indicating whether to ignore the pixel + + Returns + ------- + np.ndarray + 2D array (shape (rows, cols)) of the maximum similarity for any neighrbor + at a pixel. + + """ + return _create_loop_and_run( + ifg_stack=ifg_stack, + search_radius=search_radius, + mask=mask, + func=np.nanmax, + ) + + +def _create_loop_and_run( + ifg_stack: ArrayLike, + search_radius: int, + mask: ArrayLike, + func: Callable[[ArrayLike], np.ndarray], +): + n_ifg, rows, cols = ifg_stack.shape + if not np.iscomplexobj(ifg_stack): + raise ValueError("ifg_stack must be complex") + + unit_ifgs = np.exp(1j * np.angle(ifg_stack)) + out_similarity = np.zeros((rows, cols), dtype="float32") + if mask is None: + mask = np.ones((rows, cols), dtype="bool") + + if mask.shape != (rows, cols): + raise ValueError(f"{ifg_stack.shape = }, but {mask.shape = }") + + idxs = get_circle_idxs(search_radius) + loop_func = _make_loop_function(func) + return loop_func(unit_ifgs, idxs, mask, out_similarity) + + +def _make_loop_function( + summary_func: Callable[[ArrayLike], np.ndarray], +): + """Create a JIT-ed function for some summary of the neighbors's similarity. + + E.g.: for median similarity, call + + median_sim = _make_loop_function(np.median) + """ + + @numba.njit(nogil=True, parallel=True) + def _masked_median_sim_loop( + ifg_stack: np.ndarray, + idxs: np.ndarray, + mask: np.ndarray, + out_similarity: np.ndarray, + ) -> np.ndarray: + """Loop over each pixel, make a masked phase similarity to its neighbors.""" + _, rows, cols = ifg_stack.shape + + num_compare_pixels = len(idxs) + # Buffer to hold all comparison during the parallel loop + cur_sim = np.zeros((rows, cols, num_compare_pixels)) + + for r0 in numba.prange(rows): + for c0 in range(cols): + # Get the current pixel + m0 = mask[r0, c0] + if not m0: + continue + x0 = ifg_stack[:, r0, c0] + cur_sim_vec = cur_sim[r0, c0] + count = 0 + + # compare to all pixels in the circle around it + for i_idx in range(num_compare_pixels): + ir, ic = idxs[i_idx] + # Clip to the image bounds + r = max(min(r0 + ir, rows - 1), 0) + c = max(min(c0 + ic, cols - 1), 0) + if r == r0 and c == c0: + continue + + # Check for a pixel to ignore + if not mask[r, c]: + continue + + x = ifg_stack[:, r, c] + # cur_sim_vec[count] = w * phase_similarity(x0, x) + cur_sim_vec[count] = phase_similarity(x0, x) + count += 1 + # Assuming `summary_func` is nan-aware + out_similarity[r0, c0] = summary_func(cur_sim_vec[:count]) + return out_similarity + + return _masked_median_sim_loop + + +def get_circle_idxs( + max_radius: int, min_radius: int = 0, sort_output: bool = True +) -> np.ndarray: + """Get the relative indices of neighboring pixels in a circle. + + Adapted from c++ version of `psps` package: + https://github.com/UT-Radar-Interferometry-Group/psps/blob/a15d458817fe7d06a6edaa0b3208ea78bc4782e7/src/cpp/similarity.cpp#L16 + """ + # using the mid-point circle drawing algorithm to search for neighboring PS pixels + # # code adapted from "https://www.geeksforgeeks.org/mid-point-circle-drawing-algorithm/" + visited = np.zeros((max_radius, max_radius), dtype=bool) + visited[0][0] = True + + indices = [] + for r in range(1, max_radius): + x = r + y = 0 + p = 1 - r + if r > min_radius: + indices.append([r, 0]) + indices.append([-r, 0]) + indices.append([0, r]) + indices.append([0, -r]) + + visited[r][0] = True + visited[0][r] = True + # flag > 0 means there are holes between concentric circles + flag = 0 + while x > y: + # do not need to fill holes + if flag == 0: + y += 1 + if p <= 0: + # Mid-point is inside or on the perimeter + p += 2 * y + 1 + else: + # Mid-point is outside the perimeter + x -= 1 + p += 2 * y - 2 * x + 1 + + else: + flag -= 1 + + # All the perimeter points have already been visited + if x < y: + break + + while not visited[x - 1][y]: + x -= 1 + flag += 1 + + visited[x][y] = True + visited[y][x] = True + if r > min_radius: + indices.append([x, y]) + indices.append([-x, -y]) + indices.append([x, -y]) + indices.append([-x, y]) + + if x != y: + indices.append([y, x]) + indices.append([-y, -x]) + indices.append([y, -x]) + indices.append([-y, x]) + + if flag > 0: + x += 1 + + if sort_output: + # Sorting makes it run faster, better data access patterns + return np.array(sorted(indices)) + else: + # Indices run from middle outward + return np.array(indices) + + +def create_similarities( + ifg_file_list: Sequence[PathOrStr], + output_file: PathOrStr, + search_radius: int = 7, + sim_type: Literal["median", "max"] = "median", + block_shape: tuple[int, int] = (512, 512), + num_threads: int = 5, + add_overviews: bool = True, +): + """Create a similarity raster from as stack of ifg files. + + Parameters + ---------- + ifg_file_list : Sequence[PathOrStr] + Paths to input interferograms + output_file : PathOrStr + Output raster path + search_radius : int, optional + Maximum radius to search for pixels, by default 7 + sim_type : str, optional + Type of similarity function to run, by default "median" + Choices: "median", "max" + block_shape : tuple[int, int], optional + Size of blocks to process at one time from `ifg_file_list` + by default (512, 512) + num_threads : int, optional + Number of parallel blocks to process, by default 5 + add_overviews : bool, optional + Whether to create overviews in `output_file` by default True + + """ + from dolphin._overviews import Resampling, create_image_overviews + from dolphin.io import BackgroundRasterWriter, VRTStack, process_blocks + + if sim_type == "median": + sim_function = median_similarity + elif sim_type == "max": + sim_function = max_similarity + else: + raise ValueError(f"Unrecognized {sim_type = }") + + zero_block = np.zeros(block_shape, dtype="float32") + + def calc_sim(readers, rows, cols): + block = readers[0][:, rows, cols] + if np.sum(block) == 0 or np.isnan(block).all(): + return zero_block[rows, cols], rows, cols + + out_avg = sim_function(ifg_stack=block, search_radius=search_radius) + logger.debug(f"{rows = }, {cols = }, {block.shape = }, {out_avg.shape = }") + return out_avg, rows, cols + + out_dir = Path(output_file).parent + reader = VRTStack(ifg_file_list, outfile=out_dir / "sim_inputs.vrt") + if reader.dtype not in (np.complex64, np.complex128): + raise ValueError( + f"ifg_file_list must be complex interferograms. Got {reader.dtype}" + ) + + writer = BackgroundRasterWriter( + output_file, + like_filename=ifg_file_list[0], + dtype="float32", + driver="GTiff", + ) + process_blocks( + [reader], + writer, + func=calc_sim, + block_shape=block_shape, + num_threads=num_threads, + ) + writer.notify_finished() + + if add_overviews: + logger.info("Creating overviews for unwrapped images") + create_image_overviews(Path(output_file), resampling=Resampling.AVERAGE) diff --git a/src/dolphin/timeseries.py b/src/dolphin/timeseries.py index 0c7fd1404..176e650cb 100644 --- a/src/dolphin/timeseries.py +++ b/src/dolphin/timeseries.py @@ -1,5 +1,4 @@ import logging -from concurrent.futures import Future, ThreadPoolExecutor from pathlib import Path from tempfile import NamedTemporaryFile from typing import Callable, Protocol, Sequence, TypeVar @@ -10,12 +9,11 @@ from numpy.typing import ArrayLike from opera_utils import get_dates from scipy import ndimage -from tqdm.auto import tqdm from dolphin import DateOrDatetime, io from dolphin._overviews import ImageType, create_overviews from dolphin._types import PathOrStr, ReferencePoint -from dolphin.utils import DummyProcessPoolExecutor, flatten, format_dates +from dolphin.utils import flatten, format_dates __all__ = [ "invert_stack", @@ -270,51 +268,6 @@ def datetime_to_float(dates: Sequence[DateOrDatetime]) -> np.ndarray: return date_arr.astype(float) / sec_per_day -class BlockProcessor(Protocol): - """Protocol for a block-wise processing function. - - Reads a block of data from each reader, processes it, and returns the result - as an array-like object. - """ - - def __call__( - self, readers: Sequence[io.StackReader], rows: slice, cols: slice - ) -> ArrayLike: - ... - - -def process_blocks( - readers: Sequence[io.StackReader], - writer: io.DatasetWriter, - func: BlockProcessor, - # output_files: Sequence[PathOrStr], - # like_filename: PathOrStr | None = None, - block_shape: tuple[int, int] = (512, 512), - num_threads: int = 5, -): - """Perform block-wise processing over blocks in `readers`, writing to `writer`. - - Extracts the common functionality from several routines to read and process - a stack of rasters in parallel. - """ - shape = readers[0].shape[-2:] - slices = list(io.iter_blocks(shape, block_shape=block_shape)) - - pbar = tqdm(total=len(slices)) - - # Define the callback to write the result to an output DatasetWrite - def write_callback(fut: Future): - rows, cols, data = fut.result() - writer[..., rows, cols] = data - pbar.update() - - Executor = ThreadPoolExecutor if num_threads > 1 else DummyProcessPoolExecutor - with Executor(num_threads) as exc: - for rows, cols in slices: - future = exc.submit(func, readers=readers, rows=rows, cols=cols) - future.add_done_callback(write_callback) - - def create_velocity( unw_file_list: Sequence[PathOrStr], output_file: PathOrStr, @@ -392,7 +345,7 @@ def create_velocity( def read_and_fit( readers: Sequence[io.StackReader], rows: slice, cols: slice - ) -> tuple[slice, slice, np.ndarray]: + ) -> tuple[np.ndarray, slice, slice]: # Only use the cor_reader if it's the same shape as the unw_reader if len(readers) == 2: unw_reader, cor_reader = readers @@ -406,9 +359,9 @@ def read_and_fit( unw_stack = unw_stack - ref_data # Fit a line to each pixel with weighted least squares return ( + estimate_velocity(x_arr=x_arr, unw_stack=unw_stack, weight_stack=weights), rows, cols, - estimate_velocity(x_arr=x_arr, unw_stack=unw_stack, weight_stack=weights), ) # Note: For some reason, the `RasterStackReader` is much slower than the VRT @@ -421,7 +374,7 @@ def read_and_fit( readers.append(cor_reader) writer = io.BackgroundRasterWriter(output_file, like_filename=unw_file_list[0]) - process_blocks( + io.process_blocks( readers=readers, writer=writer, func=read_and_fit, @@ -477,7 +430,7 @@ def read_and_average( readers: Sequence[io.StackReader], rows: slice, cols: slice ) -> tuple[slice, slice, np.ndarray]: chunk = readers[0][:, rows, cols] - return rows, cols, average_func(chunk, 0) + return average_func(chunk, 0), rows, cols writer = io.BackgroundRasterWriter(output_file, like_filename=file_list[0]) with NamedTemporaryFile(mode="w", suffix=".vrt") as f: @@ -488,7 +441,7 @@ def read_and_average( read_masked=read_masked, ) - process_blocks( + io.process_blocks( readers=[reader], writer=writer, func=read_and_average, @@ -604,7 +557,7 @@ def read_and_solve( # TODO: do i want to write residuals too? Do i need # to have multiple writers then? phases = invert_stack(A, stack, weights)[0] - return rows, cols, np.asarray(phases) + return np.asarray(phases), rows, cols if cor_file_list is not None: cor_reader = io.VRTStack( @@ -616,7 +569,7 @@ def read_and_solve( writer = io.BackgroundStackWriter(out_paths, like_filename=unw_file_list[0]) - process_blocks( + io.process_blocks( readers=readers, writer=writer, func=read_and_solve, diff --git a/src/dolphin/workflows/single.py b/src/dolphin/workflows/single.py index 21fcd374a..9e3b4a5f3 100644 --- a/src/dolphin/workflows/single.py +++ b/src/dolphin/workflows/single.py @@ -1,13 +1,4 @@ -"""Estimate wrapped phase for one ministack of SLCs. - -References ----------- - .. [1] Mirzaee, Sara, Falk Amelung, and Heresh Fattahi. "Non-linear phase - linking using joined distributed and persistent scatterers." Computers & - Geosciences (2022): 105291. - - -""" +"""Estimate wrapped phase for one ministack of SLCs.""" from __future__ import annotations from dataclasses import dataclass diff --git a/tests/test_similarity.py b/tests/test_similarity.py new file mode 100644 index 000000000..f6b90a4db --- /dev/null +++ b/tests/test_similarity.py @@ -0,0 +1,97 @@ +import numpy as np +import pytest + +from dolphin import similarity + +# Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned. +pytestmark = pytest.mark.filterwarnings( + "ignore::rasterio.errors.NotGeoreferencedWarning", +) + + +def test_get_circle_idxs(): + idxs = similarity.get_circle_idxs(3) + expected = np.array( + [ + [-2, -1], + [-2, 0], + [-2, 1], + [-1, -2], + [-1, -1], + [-1, 0], + [-1, 1], + [-1, 2], + [0, -2], + [0, -1], + [0, 1], + [0, 2], + [1, -2], + [1, -1], + [1, 0], + [1, 1], + [1, 2], + [2, -1], + [2, 0], + [2, 1], + ] + ) + np.testing.assert_array_equal(idxs, expected) + + +def test_pixel_similarity(): + x1, x2 = np.exp(1j * np.angle(np.random.randn(2, 50) + np.random.randn(2, 50) * 1j)) + sim = similarity.phase_similarity(x1[:5], x2[:5]) + assert -1 <= sim <= 1 + + # For random noise, the similarity should be closer to zero with more data + sim_full = similarity.phase_similarity(x1, x2) + assert np.abs(sim_full) < np.abs(sim) + + # self similarity == 1 + assert similarity.phase_similarity(x1, x1) == 1 + + +class TestMedianSimilarity: + @pytest.fixture + def ifg_stack(self, slc_stack): + return slc_stack * slc_stack[[0]].conj() + + @pytest.mark.parametrize("radius", [2, 5, 9]) + def test_basic(self, ifg_stack, radius): + sim = similarity.median_similarity(ifg_stack, search_radius=radius) + assert np.all(sim > -1) + assert np.all(sim < 1) + + @pytest.mark.parametrize("radius", [2, 5, 9]) + def test_median_similarity_masked(self, ifg_stack, radius): + rows, cols = ifg_stack.shape[-2:] + mask = np.random.rand(rows, cols).round().astype(bool) + sim = similarity.median_similarity(ifg_stack, search_radius=radius, mask=mask) + assert np.all(sim > -1) + assert np.all(sim < 1) + + def test_create_similarity(self, tmp_path, slc_file_list): + outfile = tmp_path / "med_sim.tif" + similarity.create_similarities( + slc_file_list, output_file=outfile, num_threads=1, block_shape=(64, 64) + ) + + +class TestMaxSimilarity: + @pytest.fixture + def ifg_stack(self, slc_stack): + return slc_stack * slc_stack[[0]].conj() + + @pytest.mark.parametrize("radius", [2, 5, 9]) + def test_basic(self, ifg_stack, radius): + sim = similarity.max_similarity(ifg_stack, search_radius=radius) + assert np.all(sim > -1) + assert np.all(sim < 1) + + @pytest.mark.parametrize("radius", [2, 5, 9]) + def test_max_similarity_masked(self, ifg_stack, radius): + rows, cols = ifg_stack.shape[-2:] + mask = np.random.rand(rows, cols).round().astype(bool) + sim = similarity.max_similarity(ifg_stack, search_radius=radius, mask=mask) + assert np.all(sim >= -1) + assert np.all(sim <= 1) diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index 279935c6e..6bfba6c2f 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -171,8 +171,6 @@ def unw_files(self, tmp_path, data, raster_100_by_200): return out def test_invert_unw_network(self, data, unw_files, tmp_path): - """""" - output_dir = tmp_path / "output" output_dir.mkdir() ref_point = (0, 0)