diff --git a/xrspatial/geotiff/_gpu_decode.py b/xrspatial/geotiff/_gpu_decode.py index 205689641..db8e6cfb1 100644 --- a/xrspatial/geotiff/_gpu_decode.py +++ b/xrspatial/geotiff/_gpu_decode.py @@ -2453,12 +2453,31 @@ class _DeflateCompOpts(ctypes.Structure): return None max_cs = max_comp_size.value - # Allocate compressed output buffers on device - d_comp_bufs = [cupy.empty(max_cs, dtype=cupy.uint8) for _ in range(n_tiles)] + # Allocate one contiguous device buffer of size ``n_tiles * + # max_cs`` and view ``n_tiles`` per-tile slabs into it. This + # mirrors the single-buffer pattern already used by the decode + # path in ``_try_nvcomp_from_device_bufs`` (#1659) and the GDS + # fallback in ``gpu_decode_tiles_from_file`` (#1552). Per-tile + # ``cupy.empty`` calls each round-trip through the memory pool + # and add up to milliseconds of overhead for thousand-tile + # writes; one allocation + pointer arithmetic skips that + # bookkeeping. The slab views are non-overlapping so nvCOMP + # can write into them in parallel via the per-tile pointer + # array. See issue #1712. + _check_gpu_memory(n_tiles * max_cs, + what="nvCOMP compressed-output buffer") + d_comp_pool = cupy.empty(n_tiles * max_cs, dtype=cupy.uint8) + comp_base_ptr = int(d_comp_pool.data.ptr) + d_comp_bufs = [ + d_comp_pool[i * max_cs:(i + 1) * max_cs] + for i in range(n_tiles) + ] + comp_ptrs_host = np.arange(n_tiles, dtype=np.uint64) * np.uint64(max_cs) + comp_ptrs_host += np.uint64(comp_base_ptr) # Build pointer and size arrays d_uncomp_ptrs = cupy.array([b.data.ptr for b in d_tile_bufs], dtype=cupy.uint64) - d_comp_ptrs = cupy.array([b.data.ptr for b in d_comp_bufs], dtype=cupy.uint64) + d_comp_ptrs = cupy.asarray(comp_ptrs_host) d_uncomp_sizes = cupy.full(n_tiles, tile_bytes, dtype=cupy.uint64) d_comp_sizes = cupy.empty(n_tiles, dtype=cupy.uint64) @@ -2518,18 +2537,47 @@ class _DeflateCompOpts(ctypes.Structure): adler_checksums[i] = zlib.adler32( host_view[i * tile_bytes:(i + 1) * tile_bytes]) - # Read compressed sizes and data back to CPU + # Read compressed sizes and data back to CPU. + # + # Previously this loop ran one ``.get()`` per tile, which + # serialised on the default stream and burned a per-DMA setup + # cost (issue #1712, same pattern as the decode-side fix in + # #1552). Instead, hoist the variable-size slabs into a + # contiguous buffer with one ``cupy.concatenate``, do one D2H + # transfer, then slice the host buffer per tile. comp_sizes = d_comp_sizes.get().astype(int) + if n_tiles == 0: + return [] + # Each tile's compressed payload lives in the first + # ``comp_sizes[i]`` bytes of its ``max_cs``-sized slab. Build a + # list of trimmed views and concatenate into one packed device + # buffer; the host then sees one DMA-sized region. + trimmed = [ + d_comp_bufs[i][:int(comp_sizes[i])] for i in range(n_tiles) + ] + # The concat allocates a fresh device buffer of ``sum(comp_sizes)`` + # bytes -- a peak-VRAM bump on top of the ``n_tiles * max_cs`` + # pool above. Guard it explicitly so an OOM here surfaces with + # the same message style as ``_batched_d2h_to_bytes`` rather + # than silently triggering the broad except below. + _check_gpu_memory(int(sum(comp_sizes)), + what="nvCOMP batch staging buffer") + d_comp_concat = cupy.concatenate(trimmed) + host_concat = bytes(d_comp_concat.get()) + + # Walk host_concat by per-tile offsets to peel out each tile's + # compressed bytes. Cumulative-sum on the host avoids a second + # device round-trip. + offsets = np.zeros(n_tiles + 1, dtype=np.int64) + np.cumsum(comp_sizes, out=offsets[1:]) + result = [] for i in range(n_tiles): - cs = int(comp_sizes[i]) - raw = d_comp_bufs[i][:cs].get().tobytes() - + raw = host_concat[offsets[i]:offsets[i + 1]] if adler_checksums is not None: # Wrap raw deflate in zlib format: header + data + adler32 checksum = struct.pack('>I', adler_checksums[i] & 0xFFFFFFFF) raw = b'\x78\x9c' + raw + checksum - result.append(raw) return result diff --git a/xrspatial/geotiff/tests/test_nvcomp_batch_compress_batched_1712.py b/xrspatial/geotiff/tests/test_nvcomp_batch_compress_batched_1712.py new file mode 100644 index 000000000..c4a1fc815 --- /dev/null +++ b/xrspatial/geotiff/tests/test_nvcomp_batch_compress_batched_1712.py @@ -0,0 +1,157 @@ +"""Coverage for the batched ``_nvcomp_batch_compress`` (#1712). + +The pre-fix function allocated compressed-output device buffers one +``cupy.empty`` per tile and then read each tile back to host with one +``.get()`` per tile. Both patterns serialised on the default CUDA +stream and were dominant in large-N writes. The fix folds both into a +single contiguous device allocation + a single batched D2H concat-and- +``.get()``, matching the patterns already in use on the decode side +(#1552, #1659). + +These tests pin the new shape and confirm the deflate / zstd GPU write +paths still round-trip end-to-end. +""" +from __future__ import annotations + +import importlib.util +import inspect +import os +import tempfile + +import numpy as np +import pytest +import xarray as xr + +try: + import cupy + _HAS_CUPY = True +except Exception: + _HAS_CUPY = False + + +def _gpu_available() -> bool: + """Match the geotiff-test convention: cupy import AND working CUDA. + + A host can have cupy installed without a usable CUDA runtime (no + driver, no device visible, container misconfig), and in that case + every test that calls into the GPU writer would fail rather than + skip. ``cupy.cuda.is_available()`` is the cheap probe. + """ + if importlib.util.find_spec("cupy") is None: + return False + try: + import cupy + return bool(cupy.cuda.is_available()) + except Exception: + return False + + +_HAS_GPU = _gpu_available() + +# nvCOMP is the entry point that exercises this code path. +from xrspatial.geotiff import _gpu_decode + +needs_cupy = pytest.mark.skipif( + not _HAS_GPU, reason="cupy + CUDA required" +) + + +# ---------------------------------------------------------------------- +# Source-level structural assertions -- run on any host with the source +# available, no GPU required. +# ---------------------------------------------------------------------- + +def test_no_per_tile_cupy_empty_in_compressed_pool(): + """The per-tile cupy.empty list comprehension is gone (#1712). + + The fix replaced it with a single contiguous allocation. Catch any + regression that brings the loop back. + """ + source = inspect.getsource(_gpu_decode._nvcomp_batch_compress) + assert "cupy.empty(max_cs, dtype=cupy.uint8) for _ in range" not in source, ( + "_nvcomp_batch_compress regressed to per-tile cupy.empty " + "allocations for the compressed output pool. See #1712." + ) + + +def test_no_per_tile_get_in_result_loop(): + """The per-tile ``d_comp_bufs[i][:cs].get().tobytes()`` is gone (#1712). + + The fix replaced it with one concat + one ``.get()``. Catch any + regression that brings the per-tile pattern back. + """ + source = inspect.getsource(_gpu_decode._nvcomp_batch_compress) + # The exact string the prior loop used: + bad_fragment = "d_comp_bufs[i][:cs].get().tobytes()" + assert bad_fragment not in source, ( + "_nvcomp_batch_compress regressed to per-tile .get().tobytes() " + "D2H readback. See #1712." + ) + + +# ---------------------------------------------------------------------- +# End-to-end behaviour: GPU write + read round-trip stays correct +# ---------------------------------------------------------------------- + +@needs_cupy +@pytest.mark.parametrize("compression", ["deflate", "zstd"]) +def test_gpu_write_roundtrip_after_batched_compress(compression): + """GPU compress path round-trips uncorrupted for deflate + zstd. + + Catches the most likely regression mode: any off-by-one in the + cumulative-sum offsets used to slice the host-side concatenated + buffer would scramble tile order, which a round-trip equality + check picks up immediately. + """ + from xrspatial.geotiff import write_geotiff_gpu, open_geotiff + + rng = np.random.default_rng(seed=1712) + arr_cpu = rng.random((512, 512), dtype=np.float32) + arr_gpu = cupy.asarray(arr_cpu) + darr = xr.DataArray(arr_gpu, dims=["y", "x"]) + + with tempfile.TemporaryDirectory(prefix="nvcomp_batch_1712_") as td: + path = os.path.join(td, f"roundtrip_{compression}.tif") + try: + write_geotiff_gpu( + darr, path, + compression=compression, + tiled=True, + tile_size=64, + ) + except RuntimeError as e: + # nvCOMP may be unavailable in this environment; the writer + # falls back to CPU and that path doesn't exercise the + # change. Skip rather than fail. + pytest.skip(f"nvCOMP unavailable for {compression}: {e}") + + back = open_geotiff(path) + np.testing.assert_allclose(back.values, arr_cpu, rtol=0, atol=0) + + +@needs_cupy +def test_gpu_write_zero_tile_edge_case(): + """A 0-tile compress returns an empty list without indexing into None. + + The cumulative-sum / concat path must short-circuit before + ``cupy.concatenate`` (which would raise on an empty list). The + pre-fix loop simply iterated zero times, so the contract is the + same empty-list output. + """ + # Direct call into the internal function with n_tiles=0 is + # awkward because it needs a libnvCOMP handle and matching opts. + # Instead, exercise the public writer with a tiny single-tile + # input and confirm the fast path does not crash. Real n_tiles==0 + # never occurs via the writer (every image has at least one tile). + from xrspatial.geotiff import write_geotiff_gpu, open_geotiff + arr_gpu = cupy.zeros((32, 32), dtype=cupy.float32) + darr = xr.DataArray(arr_gpu, dims=["y", "x"]) + with tempfile.TemporaryDirectory(prefix="nvcomp_batch_1712_") as td: + path = os.path.join(td, "tiny.tif") + try: + write_geotiff_gpu(darr, path, compression="zstd", + tiled=True, tile_size=32) + except RuntimeError as e: + pytest.skip(f"nvCOMP unavailable: {e}") + back = open_geotiff(path) + assert back.shape == (32, 32)