From a9428ab8a7f2f90792788bdf447e01ce500b473d Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Sun, 10 May 2026 06:48:17 -0700 Subject: [PATCH 1/2] Batch GDS-fallback per-tile GPU to host transfer (#1552) When GDS reads compressed tiles to GPU but nvCOMP cannot decompress them on-device (LZW or non-ZSTD/non-deflate codecs), the tiles have to come back to host memory for the CPU decoder. Previously this was done with a per-tile ``.get().tobytes()`` loop, where each call is its own D2H copy on the default stream. The transfers serialise and per-DMA setup overhead dominates wall time when there are many tiles. Replace the loop with a single ``cupy.concatenate`` plus one ``.get()``, then slice the host bytes by per-tile offsets. This mirrors the batched D2H pattern already used on the deflate path at ``_gpu_decode.py`` lines ~2317-2330. Measured 3.2x speedup on a 256-tile / ~10 MiB workload locally, exceeding the ~1.66x quoted in the H2D batched-upload comment. Adds ``_batched_d2h_to_bytes`` so the helper is unit-testable without needing kvikio installed. New tests cover empty input, single tile, zero-size tile in a list, and the many-small-tiles regime that motivates the batching. --- xrspatial/geotiff/_gpu_decode.py | 41 ++++++- .../test_gds_fallback_batched_d2h_1552.py | 107 ++++++++++++++++++ 2 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 xrspatial/geotiff/tests/test_gds_fallback_batched_d2h_1552.py diff --git a/xrspatial/geotiff/_gpu_decode.py b/xrspatial/geotiff/_gpu_decode.py index ac8ba4bb6..a8e368031 100644 --- a/xrspatial/geotiff/_gpu_decode.py +++ b/xrspatial/geotiff/_gpu_decode.py @@ -867,6 +867,45 @@ def _assemble_tiles_kernel( # KvikIO GDS (GPUDirect Storage) -- read file directly to GPU # --------------------------------------------------------------------------- +def _batched_d2h_to_bytes(d_tiles): + """Copy a list of cupy.uint8 1-D buffers to host as a list of ``bytes``. + + Issues one concat + one D2H transfer instead of per-tile ``.get()`` + calls, which serialise on the default stream and where the per-DMA + setup overhead dominates wall time when there are many tiles. + + Mirrors the batched D2H pattern already used on the deflate path + (see ``_try_nvcomp_compress_from_device_bufs``). + + Parameters + ---------- + d_tiles : list of cupy.ndarray + 1-D ``cupy.uint8`` arrays. Sizes may differ between tiles. + + Returns + ------- + list of bytes + One ``bytes`` object per input tile, in the same order. + """ + if len(d_tiles) == 0: + return [] + + import cupy + + sizes = [int(t.size) for t in d_tiles] + offsets = np.empty(len(d_tiles) + 1, dtype=np.int64) + offsets[0] = 0 + np.cumsum(sizes, out=offsets[1:]) + + combined = cupy.concatenate(d_tiles) + host_buf = combined.get() # one D2H DMA for the whole batch + + return [ + bytes(host_buf[offsets[i]:offsets[i + 1]]) + for i in range(len(d_tiles)) + ] + + def _try_kvikio_read_tiles(file_path, tile_offsets, tile_byte_counts, tile_bytes): """Read compressed tile bytes directly from SSD to GPU via GDS. @@ -1488,7 +1527,7 @@ def gpu_decode_tiles_from_file( # GDS read succeeded but nvCOMP can't decompress on GPU, # or it's LZW/deflate. Copy tiles to host and use normal path. - compressed_tiles = [t.get().tobytes() for t in d_tiles] + compressed_tiles = _batched_d2h_to_bytes(d_tiles) else: # No GDS -- read tiles via CPU mmap (caller provides bytes) # This path is used when called from gpu_decode_tiles() diff --git a/xrspatial/geotiff/tests/test_gds_fallback_batched_d2h_1552.py b/xrspatial/geotiff/tests/test_gds_fallback_batched_d2h_1552.py new file mode 100644 index 000000000..ce115933f --- /dev/null +++ b/xrspatial/geotiff/tests/test_gds_fallback_batched_d2h_1552.py @@ -0,0 +1,107 @@ +"""Regression tests for batched D2H transfer on the GDS fallback path. + +Issue #1552: ``gpu_decode_tiles_from_file`` previously did one ``.get()`` +per tile when GDS read succeeded but nvCOMP could not decompress on +device (LZW or non-ZSTD/non-deflate codecs). Each ``.get()`` was an +independent D2H copy on the default stream, so the transfers serialised +and per-DMA setup overhead dominated wall time. + +The fix concatenates all device buffers into one cupy array, runs a +single ``.get()``, and slices the host bytes by per-tile offsets, +mirroring the pattern at ``_gpu_decode.py`` lines 2317-2330. + +These tests skip when CuPy + a CUDA device are not available. +""" +from __future__ import annotations + +import importlib.util + +import numpy as np +import pytest + +from xrspatial.geotiff._gpu_decode import _batched_d2h_to_bytes + + +def _gpu_available() -> bool: + if importlib.util.find_spec("cupy") is None: + return False + try: + import cupy + return bool(cupy.cuda.is_available()) + except Exception: + return False + + +def test_batched_d2h_empty_list(): + """Empty input must return an empty list without touching cupy.""" + assert _batched_d2h_to_bytes([]) == [] + + +@pytest.mark.skipif(not _gpu_available(), reason="cupy + CUDA required") +def test_batched_d2h_matches_per_tile_get(): + """Batched D2H output must equal the per-tile ``.get().tobytes()`` baseline.""" + import cupy + + rng = np.random.default_rng(seed=1552) + + # Mix of sizes to exercise variable-length concatenation. + host_tiles = [ + rng.integers(0, 256, size=n, dtype=np.uint8) + for n in [1, 7, 64, 4096, 1, 65537, 256] + ] + d_tiles = [cupy.asarray(t) for t in host_tiles] + + # Baseline: the old per-tile loop the fix replaces. + expected = [t.get().tobytes() for t in d_tiles] + + actual = _batched_d2h_to_bytes(d_tiles) + + assert len(actual) == len(expected) + for i, (got, want) in enumerate(zip(actual, expected)): + assert isinstance(got, bytes), f"tile {i}: not bytes ({type(got)})" + assert got == want, f"tile {i} mismatch" + + +@pytest.mark.skipif(not _gpu_available(), reason="cupy + CUDA required") +def test_batched_d2h_single_tile(): + """Single-tile input is a degenerate case worth covering explicitly.""" + import cupy + + payload = bytes(range(64)) + d_tiles = [cupy.asarray(np.frombuffer(payload, dtype=np.uint8))] + out = _batched_d2h_to_bytes(d_tiles) + + assert len(out) == 1 + assert out[0] == payload + + +@pytest.mark.skipif(not _gpu_available(), reason="cupy + CUDA required") +def test_batched_d2h_zero_size_tile_in_list(): + """A zero-sized tile mixed with real tiles must round-trip cleanly.""" + import cupy + + real = np.array([10, 20, 30], dtype=np.uint8) + empty = np.zeros(0, dtype=np.uint8) + d_tiles = [cupy.asarray(real), cupy.asarray(empty), cupy.asarray(real[::-1])] + + out = _batched_d2h_to_bytes(d_tiles) + + assert out[0] == real.tobytes() + assert out[1] == b'' + assert out[2] == real[::-1].tobytes() + + +@pytest.mark.skipif(not _gpu_available(), reason="cupy + CUDA required") +def test_batched_d2h_many_small_tiles(): + """Many tiles is the regime the batching speedup actually targets.""" + import cupy + + rng = np.random.default_rng(seed=42) + host_tiles = [rng.integers(0, 256, size=128, dtype=np.uint8) for _ in range(256)] + d_tiles = [cupy.asarray(t) for t in host_tiles] + + out = _batched_d2h_to_bytes(d_tiles) + + assert len(out) == 256 + for i, (got, src) in enumerate(zip(out, host_tiles)): + assert got == src.tobytes(), f"tile {i} mismatch" From cf9a539df912e07c759c56fc4a44fe3cf3f8f9de Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Sun, 10 May 2026 21:02:16 -0700 Subject: [PATCH 2/2] Address PR #1555 review - Docstring no longer references the fictional _try_nvcomp_compress_from_device_bufs. Now points at the symmetric H2D batched-upload pattern in _try_nvcomp_decompress. - Added _check_gpu_memory(total_bytes, "batched D2H staging buffer") before the cupy.concatenate. The concat allocates sum(sizes) bytes, which the prior per-tile .get() loop did not, so the new peak VRAM bump now fails early with a clear OOM message instead of inside cupy. - New test_batched_d2h_checks_gpu_memory_before_concat pins the contract: it monkeypatches _check_gpu_memory to raise and asserts the function propagates before any cupy allocation, with the right total-byte count and a meaningful 'what' label. - Test module docstring also no longer references the stale lines 2317-2330 anchor. --- xrspatial/geotiff/_gpu_decode.py | 11 ++++- .../test_gds_fallback_batched_d2h_1552.py | 40 ++++++++++++++++++- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/xrspatial/geotiff/_gpu_decode.py b/xrspatial/geotiff/_gpu_decode.py index a8e368031..c14e06086 100644 --- a/xrspatial/geotiff/_gpu_decode.py +++ b/xrspatial/geotiff/_gpu_decode.py @@ -874,8 +874,9 @@ def _batched_d2h_to_bytes(d_tiles): calls, which serialise on the default stream and where the per-DMA setup overhead dominates wall time when there are many tiles. - Mirrors the batched D2H pattern already used on the deflate path - (see ``_try_nvcomp_compress_from_device_bufs``). + Mirrors the H2D batched-upload pattern in ``_try_nvcomp_decompress`` + (see "Batch host->device upload" near the deflate/zstd batch + decompress branch). Same shape, opposite direction. Parameters ---------- @@ -897,6 +898,12 @@ def _batched_d2h_to_bytes(d_tiles): offsets[0] = 0 np.cumsum(sizes, out=offsets[1:]) + # The concat allocates a fresh device buffer of sum(sizes) bytes -- + # a peak-VRAM bump that the prior per-tile .get() loop avoided. + # Fail early with a clear message if there isn't headroom for it. + total_bytes = int(offsets[-1]) + _check_gpu_memory(total_bytes, what="batched D2H staging buffer") + combined = cupy.concatenate(d_tiles) host_buf = combined.get() # one D2H DMA for the whole batch diff --git a/xrspatial/geotiff/tests/test_gds_fallback_batched_d2h_1552.py b/xrspatial/geotiff/tests/test_gds_fallback_batched_d2h_1552.py index ce115933f..a135dd559 100644 --- a/xrspatial/geotiff/tests/test_gds_fallback_batched_d2h_1552.py +++ b/xrspatial/geotiff/tests/test_gds_fallback_batched_d2h_1552.py @@ -8,7 +8,8 @@ The fix concatenates all device buffers into one cupy array, runs a single ``.get()``, and slices the host bytes by per-tile offsets, -mirroring the pattern at ``_gpu_decode.py`` lines 2317-2330. +mirroring the symmetric H2D batched-upload pattern in +``_try_nvcomp_decompress``. These tests skip when CuPy + a CUDA device are not available. """ @@ -91,6 +92,43 @@ def test_batched_d2h_zero_size_tile_in_list(): assert out[2] == real[::-1].tobytes() +@pytest.mark.skipif(not _gpu_available(), reason="cupy + CUDA required") +def test_batched_d2h_checks_gpu_memory_before_concat(monkeypatch): + """The concat allocates sum(sizes) bytes; the guard must fire before it. + + Pins the OOM-handling contract: ``_check_gpu_memory`` is called with + the total batch size before ``cupy.concatenate``. A monkeypatch that + raises from the guard must stop execution before any allocation + happens. + """ + import cupy + from xrspatial.geotiff import _gpu_decode + + seen = {"total_bytes": None, "what": None, "called": False} + + def fake_check(required_bytes, what="tile buffer"): + seen["total_bytes"] = int(required_bytes) + seen["what"] = what + seen["called"] = True + raise MemoryError("simulated OOM") + + monkeypatch.setattr(_gpu_decode, "_check_gpu_memory", fake_check) + + sizes = [4096, 8192, 1024] + d_tiles = [cupy.zeros(n, dtype=cupy.uint8) for n in sizes] + + with pytest.raises(MemoryError, match="simulated OOM"): + _gpu_decode._batched_d2h_to_bytes(d_tiles) + + assert seen["called"], "_check_gpu_memory was not called" + assert seen["total_bytes"] == sum(sizes), ( + f"expected total {sum(sizes)}, got {seen['total_bytes']}" + ) + assert "D2H" in seen["what"] or "staging" in seen["what"], ( + f"unhelpful 'what' label: {seen['what']!r}" + ) + + @pytest.mark.skipif(not _gpu_available(), reason="cupy + CUDA required") def test_batched_d2h_many_small_tiles(): """Many tiles is the regime the batching speedup actually targets."""