diff --git a/.claude/sweep-performance-state.csv b/.claude/sweep-performance-state.csv index 84868934..3d67664e 100644 --- a/.claude/sweep-performance-state.csv +++ b/.claude/sweep-performance-state.csv @@ -18,7 +18,7 @@ fire,2026-03-31T18:00:00Z,SAFE,compute-bound,0,, flood,2026-03-31T18:00:00Z,SAFE,compute-bound,0,, focal,2026-03-31T18:00:00Z,SAFE,compute-bound,0,, geodesic,2026-03-31T18:00:00Z,N/A,compute-bound,0,, -geotiff,2026-05-15,SAFE,IO-bound,0,1756,"Pass 10 (2026-05-15): 1 new MEDIUM found and fixed; 2 LOW noted. MEDIUM (_reader.py:2737): _fetch_decode_cog_http_tiles decoded tiles sequentially in a Python for-loop after the concurrent fetch landed (issue #1480). Local _read_tiles parallelises decode whenever tile_pixels >= 64K via ThreadPoolExecutor (_reader.py:2017); the HTTP path was structurally similar but never picked up the same gate, so wide windowed reads of multi-tile COGs left deflate/zstd decode single-threaded. Mirrored the local-path threshold + pool. 5 new tests in test_cog_http_parallel_decode_2026_05_15.py (parallel + serial round-trip correctness, pool-instantiation branch selection above the threshold, single-tile path skips the pool, structural _decode_strip_or_tile call count == n_tiles). All 262 COG/HTTP tests pass; 3162 of 3164 selected geotiff tests pass overall (2 pre-existing failures predating Pass 9 per prior notes -- test_predictor2_big_endian_gpu_1517 references the now-private read_to_array attr, and the test_size_param_validation_gpu_vrt_1776 tile_size=4 validator failure). LOW deferred (no fix in this PR): (1) _block_reduce_2d_gpu (_gpu_decode.py:3142/3163/3189) does bool(mask.any().item()) per overview level when nodata is set, paying one device sync per level; the alternative (unconditional cupy.putmask) always pays the work cost and the short-circuit is correct under the current API. (2) _nvcomp_batch_compress adler32 staging (_gpu_decode.py:2543-2546) issues n_tiles slice-assign kernels into a fresh contig buffer despite all callers passing slices of a single underlying d_tile_buf; an API refactor to accept the source buffer directly would skip the rebuild. SAFE/IO-bound verdict holds. Dask probe: 2560x2560 chunks=256 yields 400 tasks (4 per chunk), well under the 50000 cap. GPU probe: 1024x1024 float32 zstd read returns CuPy-backed in 236 ms with no host round-trip. | Rockout 2026-05-15: LOW filed #1934 -- _apply_nodata_mask_gpu used cupy.where (allocating); switched to cupy.putmask on the already-owned buffer (float path) and on the post-astype float64 buffer (int path). Saves one chunk-sized device allocation per call. 7 new tests in test_apply_nodata_mask_gpu_inplace_1934.py; 52 related nodata tests pass. | Pass 8 (2026-05-12): 1 new MEDIUM found and fixed. _assemble_standard_layout/_assemble_cog_layout returned bytes(bytearray), doubling peak memory transiently during eager writes. Filed #1756, fixed by returning the bytearray directly. Measured: 95 MB uint8 raster peak drops 202 MB -> 107 MB. _write_bytes / parse_header already accepted the buffer protocol so the change is transparent to callers. 6 new tests in test_assemble_layout_no_bytes_copy_1756.py. 2123 existing geotiff tests pass; the 10 unrelated failures (test_no_georef_windowed_coords_1710, test_predictor2_big_endian_gpu_1517) reference the now-private read_to_array attribute (commit 8adb749, issue #1708) and predate this change. SAFE/IO-bound verdict holds. | Pass 7 (2026-05-12): re-audit identified 4 MEDIUM findings, all real, all backed by microbenches. (1) unpack_bits sub-byte loops for bps=2/4/12 in _compression.py:836-878 were 100-200x slower than vectorised numpy (filed #1713, fixed in this branch: bps=4 2M pixels drops from 165ms to 3ms = 55x; bps=2/12 similar). (2) _write_vrt_tiled at __init__.py:1708 uses scheduler='synchronous' on independent tile writes; measured 33% slowdown on 256-tile zstd write vs threads scheduler (filed #1714, no fix yet). (3) _nvcomp_batch_compress at _gpu_decode.py:2522-2526 still does per-tile cupy.get().tobytes() despite #1552 / #1659 fixing the same pattern elsewhere; measured 45% reduction with concat+single get on n=1024 (filed #1712, no fix yet). (4) _nvcomp_batch_compress at _gpu_decode.py:2457 uses per-tile cupy.empty allocations; 1024 tiles 16KB drops from 4.7ms to 1.0ms with single contiguous + views (bundled into #1712). Cat 6 OOM verdict: SAFE/IO-bound holds -- read_geotiff_dask caps task count at _MAX_DASK_CHUNKS=50_000 and per-chunk memory is bounded by chunk size. _inflate_tiles_kernel resource usage on Ampere: 67 regs/thread, 2896B local/thread, 8192B shared/block (LZW kernel: 29 regs, 24576B shared) -- register pressure under control; high local memory in inflate is unavoidable (LZ77 state) but only thread 0 in each block uses it. | Pass 4 (2026-05-10): re-audit after #1559 (centralise attrs across all read backends). New _populate_attrs_from_geo_info helper at __init__.py:301 runs once per read, not per-chunk -- no perf impact. Probe: 2560x2560 deflate-tiled file opened via read_geotiff_dask yields 400 tasks (4 tasks/chunk for 100 chunks), well under 1M cap. read_geotiff_gpu(1024x1024) returns cupy.ndarray end-to-end with no host round-trip (226ms incl. write+decode). No new HIGH/MEDIUM findings. SAFE/IO-bound holds. | Pass 3 (2026-05-10): SAFE/IO-bound. Audited 4 perf commits: #1558 (in-place NaN writes on uniquely-owned buffers correct), #1556 (fp-predictor ngjit ~297us/tile for 256x256 float32), #1552 (single cupy.concatenate + one .get() for batched D2H at _gpu_decode.py:870-913), #1551 (parallel decode threshold >=65536px engages 256x256 default at _reader.py:1121). Bench: 8192x8192 f32 deflate+pred2 256-tile write 782ms; 4096x4096 f32 deflate read 83ms with parallel decode. Deferred LOW (none filed, all <10% MEDIUM threshold): _writer.py:459/1109 redundant .copy() before predictor encode (~1% per tile), _compression.py:280 lzw_decompress dst[:n].copy() (~2% per LZW tile decode), _writer.py:1419 seg_np.copy() before in-place NaN substitution (negligible, conditional path), _CloudSource.read_range opens fresh fsspec handle per range (pre-existing, predates audit scope). nvCOMP per-tile D2H batching break-even confirmed (variable sizes need staging buffer, no win). | Pass 3 (2026-05-10): audited f157746,39322c3,f23ec8f,1aac3b7. All 5 commits correct. Redundant .copy() in _writer.py:459,1109 and _compression.py:280 (1-2% overhead, LOW). _CloudSource.read_range() per-call open is pre-existing arch issue. No HIGH/MEDIUM regressions. SAFE. | re-audit 2026-05-02: 6 commits since 2026-04-16 (predictor=3 CPU encode/decode, GPU predictor stride fix, validate_tile_layout, BigTIFF LONG8 offsets, AREA_OR_POINT VRT, per-tile alloc guard). 1M dask chunk cap intact at __init__.py:948; adler32 batch transfer intact at _gpu_decode.py:1825. New code is metadata validation and dispatcher logic with no extra materialization or per-tile sync points. No HIGH/MEDIUM regressions. | Pass 5 (2026-05-12): re-audit identified MEDIUM in _gpu_decode.py:1577 _try_nvcomp_from_device_bufs: per-tile cupy.empty + trailing cupy.concatenate doubled peak VRAM and added serial concat. Filed #1659 and fixed to single-buffer + pointer offsets (matches LZW/deflate/host-buffer patterns at L1847/L1878/L1114). Microbench (alloc+concat overhead only, not full nvCOMP latency): n=256 tile_bytes=65536 drops 3.66ms->0.69ms, n=256 tile_bytes=262144 drops 8.18ms->0.13ms. Tests: 5 new tests in test_nvcomp_from_device_bufs_single_alloc_1659.py (codec short-circuit, no-lib short-circuit, memory-guard contract, real ZSTD round-trip via nvCOMP, structural single-buffer check). 1458 existing geotiff tests pass, 3 unrelated matplotlib/py3.14 failures pre-existing. SAFE/IO-bound verdict holds. | Pass 6 (2026-05-12): re-audit on top of #1659. New HIGH in _try_kvikio_read_tiles at _gpu_decode.py:941: per-tile cupy.empty() + blocking IOFuture.get() inside loop serialised GDS reads to ~1 outstanding pread, missed parallelism the kvikio worker pool was designed for, paid per-tile cupy.empty setup (matches #1659 anti-pattern in nvCOMP path), and lacked _check_gpu_memory guard. Filed #1688 and fixed to single contiguous buffer + batched submit + guard. Microbench with 8-worker pool simulation: 256 tiles@1ms latency drops 256ms->38.7ms (~6.6x); single-thread simulation 256ms->28.5ms (9x). Tests: 9 new tests in test_kvikio_batched_pread_1688.py (kvikio-absent path, single-buffer pointer arithmetic, submit-before-get ordering, memory guard, partial-read fallback, round-trip data, zero-size/all-sparse tiles). All 1577 geotiff tests pass except pre-existing matplotlib/py3.14 failures." +geotiff,2026-05-18,SAFE,IO-bound,0,2100,"Pass 11 (2026-05-18): 1 MEDIUM found and fixed. _read_strips (_reader.py:~L1972) and _fetch_decode_cog_http_strips (_reader.py:~L2670) decoded strips sequentially in a Python for-loop while the tile counterparts (_read_tiles L2146, _fetch_decode_cog_http_tiles L2898) gated parallel decode on _PARALLEL_DECODE_PIXEL_THRESHOLD via ThreadPoolExecutor. Filed #2100 and fixed: both strip paths now collect jobs, parallel-decode when n_strips > 1 and strip_pixels >= 64K, then place sequentially. Measured (uint16, 4-core): 4096x4096 deflate 130ms->34ms (3.82x), 8192x8192 deflate 531ms->146ms (3.63x), 8192x8192 zstd 211ms->85ms (2.48x), uncompressed 25ms->22ms (1.14x). 5 new tests in test_parallel_strip_decode_2100.py (parallel/serial parity, pool-engaged on multi-strip, serial-path for single-strip, windowed cross-strip read, HTTP COG strip parity). 3998 tests pass; 8 pre-existing failures predating this change (predictor2 BE + size_param_validation_gpu_vrt reference now-private read_to_array attr). SAFE/IO-bound verdict holds. | Pass 10 (2026-05-15): 1 new MEDIUM found and fixed; 2 LOW noted. MEDIUM (_reader.py:2737): _fetch_decode_cog_http_tiles decoded tiles sequentially in a Python for-loop after the concurrent fetch landed (issue #1480). Local _read_tiles parallelises decode whenever tile_pixels >= 64K via ThreadPoolExecutor (_reader.py:2017); the HTTP path was structurally similar but never picked up the same gate, so wide windowed reads of multi-tile COGs left deflate/zstd decode single-threaded. Mirrored the local-path threshold + pool. 5 new tests in test_cog_http_parallel_decode_2026_05_15.py (parallel + serial round-trip correctness, pool-instantiation branch selection above the threshold, single-tile path skips the pool, structural _decode_strip_or_tile call count == n_tiles). All 262 COG/HTTP tests pass; 3162 of 3164 selected geotiff tests pass overall (2 pre-existing failures predating Pass 9 per prior notes -- test_predictor2_big_endian_gpu_1517 references the now-private read_to_array attr, and the test_size_param_validation_gpu_vrt_1776 tile_size=4 validator failure). LOW deferred (no fix in this PR): (1) _block_reduce_2d_gpu (_gpu_decode.py:3142/3163/3189) does bool(mask.any().item()) per overview level when nodata is set, paying one device sync per level; the alternative (unconditional cupy.putmask) always pays the work cost and the short-circuit is correct under the current API. (2) _nvcomp_batch_compress adler32 staging (_gpu_decode.py:2543-2546) issues n_tiles slice-assign kernels into a fresh contig buffer despite all callers passing slices of a single underlying d_tile_buf; an API refactor to accept the source buffer directly would skip the rebuild. SAFE/IO-bound verdict holds. Dask probe: 2560x2560 chunks=256 yields 400 tasks (4 per chunk), well under the 50000 cap. GPU probe: 1024x1024 float32 zstd read returns CuPy-backed in 236 ms with no host round-trip. | Rockout 2026-05-15: LOW filed #1934 -- _apply_nodata_mask_gpu used cupy.where (allocating); switched to cupy.putmask on the already-owned buffer (float path) and on the post-astype float64 buffer (int path). Saves one chunk-sized device allocation per call. 7 new tests in test_apply_nodata_mask_gpu_inplace_1934.py; 52 related nodata tests pass. | Pass 8 (2026-05-12): 1 new MEDIUM found and fixed. _assemble_standard_layout/_assemble_cog_layout returned bytes(bytearray), doubling peak memory transiently during eager writes. Filed #1756, fixed by returning the bytearray directly. Measured: 95 MB uint8 raster peak drops 202 MB -> 107 MB. _write_bytes / parse_header already accepted the buffer protocol so the change is transparent to callers. 6 new tests in test_assemble_layout_no_bytes_copy_1756.py. 2123 existing geotiff tests pass; the 10 unrelated failures (test_no_georef_windowed_coords_1710, test_predictor2_big_endian_gpu_1517) reference the now-private read_to_array attribute (commit 8adb749, issue #1708) and predate this change. SAFE/IO-bound verdict holds. | Pass 7 (2026-05-12): re-audit identified 4 MEDIUM findings, all real, all backed by microbenches. (1) unpack_bits sub-byte loops for bps=2/4/12 in _compression.py:836-878 were 100-200x slower than vectorised numpy (filed #1713, fixed in this branch: bps=4 2M pixels drops from 165ms to 3ms = 55x; bps=2/12 similar). (2) _write_vrt_tiled at __init__.py:1708 uses scheduler='synchronous' on independent tile writes; measured 33% slowdown on 256-tile zstd write vs threads scheduler (filed #1714, no fix yet). (3) _nvcomp_batch_compress at _gpu_decode.py:2522-2526 still does per-tile cupy.get().tobytes() despite #1552 / #1659 fixing the same pattern elsewhere; measured 45% reduction with concat+single get on n=1024 (filed #1712, no fix yet). (4) _nvcomp_batch_compress at _gpu_decode.py:2457 uses per-tile cupy.empty allocations; 1024 tiles 16KB drops from 4.7ms to 1.0ms with single contiguous + views (bundled into #1712). Cat 6 OOM verdict: SAFE/IO-bound holds -- read_geotiff_dask caps task count at _MAX_DASK_CHUNKS=50_000 and per-chunk memory is bounded by chunk size. _inflate_tiles_kernel resource usage on Ampere: 67 regs/thread, 2896B local/thread, 8192B shared/block (LZW kernel: 29 regs, 24576B shared) -- register pressure under control; high local memory in inflate is unavoidable (LZ77 state) but only thread 0 in each block uses it. | Pass 4 (2026-05-10): re-audit after #1559 (centralise attrs across all read backends). New _populate_attrs_from_geo_info helper at __init__.py:301 runs once per read, not per-chunk -- no perf impact. Probe: 2560x2560 deflate-tiled file opened via read_geotiff_dask yields 400 tasks (4 tasks/chunk for 100 chunks), well under 1M cap. read_geotiff_gpu(1024x1024) returns cupy.ndarray end-to-end with no host round-trip (226ms incl. write+decode). No new HIGH/MEDIUM findings. SAFE/IO-bound holds. | Pass 3 (2026-05-10): SAFE/IO-bound. Audited 4 perf commits: #1558 (in-place NaN writes on uniquely-owned buffers correct), #1556 (fp-predictor ngjit ~297us/tile for 256x256 float32), #1552 (single cupy.concatenate + one .get() for batched D2H at _gpu_decode.py:870-913), #1551 (parallel decode threshold >=65536px engages 256x256 default at _reader.py:1121). Bench: 8192x8192 f32 deflate+pred2 256-tile write 782ms; 4096x4096 f32 deflate read 83ms with parallel decode. Deferred LOW (none filed, all <10% MEDIUM threshold): _writer.py:459/1109 redundant .copy() before predictor encode (~1% per tile), _compression.py:280 lzw_decompress dst[:n].copy() (~2% per LZW tile decode), _writer.py:1419 seg_np.copy() before in-place NaN substitution (negligible, conditional path), _CloudSource.read_range opens fresh fsspec handle per range (pre-existing, predates audit scope). nvCOMP per-tile D2H batching break-even confirmed (variable sizes need staging buffer, no win). | Pass 3 (2026-05-10): audited f157746,39322c3,f23ec8f,1aac3b7. All 5 commits correct. Redundant .copy() in _writer.py:459,1109 and _compression.py:280 (1-2% overhead, LOW). _CloudSource.read_range() per-call open is pre-existing arch issue. No HIGH/MEDIUM regressions. SAFE. | re-audit 2026-05-02: 6 commits since 2026-04-16 (predictor=3 CPU encode/decode, GPU predictor stride fix, validate_tile_layout, BigTIFF LONG8 offsets, AREA_OR_POINT VRT, per-tile alloc guard). 1M dask chunk cap intact at __init__.py:948; adler32 batch transfer intact at _gpu_decode.py:1825. New code is metadata validation and dispatcher logic with no extra materialization or per-tile sync points. No HIGH/MEDIUM regressions. | Pass 5 (2026-05-12): re-audit identified MEDIUM in _gpu_decode.py:1577 _try_nvcomp_from_device_bufs: per-tile cupy.empty + trailing cupy.concatenate doubled peak VRAM and added serial concat. Filed #1659 and fixed to single-buffer + pointer offsets (matches LZW/deflate/host-buffer patterns at L1847/L1878/L1114). Microbench (alloc+concat overhead only, not full nvCOMP latency): n=256 tile_bytes=65536 drops 3.66ms->0.69ms, n=256 tile_bytes=262144 drops 8.18ms->0.13ms. Tests: 5 new tests in test_nvcomp_from_device_bufs_single_alloc_1659.py (codec short-circuit, no-lib short-circuit, memory-guard contract, real ZSTD round-trip via nvCOMP, structural single-buffer check). 1458 existing geotiff tests pass, 3 unrelated matplotlib/py3.14 failures pre-existing. SAFE/IO-bound verdict holds. | Pass 6 (2026-05-12): re-audit on top of #1659. New HIGH in _try_kvikio_read_tiles at _gpu_decode.py:941: per-tile cupy.empty() + blocking IOFuture.get() inside loop serialised GDS reads to ~1 outstanding pread, missed parallelism the kvikio worker pool was designed for, paid per-tile cupy.empty setup (matches #1659 anti-pattern in nvCOMP path), and lacked _check_gpu_memory guard. Filed #1688 and fixed to single contiguous buffer + batched submit + guard. Microbench with 8-worker pool simulation: 256 tiles@1ms latency drops 256ms->38.7ms (~6.6x); single-thread simulation 256ms->28.5ms (9x). Tests: 9 new tests in test_kvikio_batched_pread_1688.py (kvikio-absent path, single-buffer pointer arithmetic, submit-before-get ordering, memory guard, partial-read fallback, round-trip data, zero-size/all-sparse tiles). All 1577 geotiff tests pass except pre-existing matplotlib/py3.14 failures." glcm,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,"Downgraded to MEDIUM. da.stack without rechunk is scheduling overhead, not OOM risk." hillshade,2026-04-16T12:00:00Z,SAFE,compute-bound,0,,"Re-audit after Horn's method rewrite (PR 1175): clean stencil, map_overlap depth=(1,1), no materialization. Zero findings." hydro,2026-05-01,RISKY,memory-bound,0,1416,"Fixed-in-tree 2026-05-01: hand_mfd._hand_mfd_dask now assembles via da.map_blocks instead of eager da.block of pre-computed tiles (matches hand_dinf pattern). Remaining MEDIUM: sink_d8 CCL fully materializes labels (inherently global), flow_accumulation_mfd frac_bdry held in driver dict instead of memmap-backed BoundaryStore. D8 iterative paths (flow_accum/fill/watershed/basin/stream_*) use serial-tile sweep with memmap-backed boundary store -- per-tile RAM bounded but driver iterates O(diameter) times. flow_direction_*, flow_path/snap_pour_point/twi/hand_d8/hand_dinf are SAFE." diff --git a/xrspatial/geotiff/_reader.py b/xrspatial/geotiff/_reader.py index ca05daef..b8fa54a5 100644 --- a/xrspatial/geotiff/_reader.py +++ b/xrspatial/geotiff/_reader.py @@ -1939,62 +1939,78 @@ def _read_strips(data: bytes, ifd: IFD, header: TIFFHeader, else: result = np.empty((out_h, out_w), dtype=dtype) + # Collect strip jobs; decode in parallel when the pool overhead pays off. + # Mirrors the tile path's gate at ``_PARALLEL_DECODE_PIXEL_THRESHOLD``: + # codec decode (deflate, zstd, LZW) releases the GIL inside the C + # extension so threads actually overlap codec work across cores. The + # placement loop that copies pixels into ``result`` stays serial to + # avoid contending writes to the output buffer. See issue #2100. + strip_jobs: list[tuple[int, int, int]] = [] # (band_idx, strip_idx, global_idx) if planar == 2 and samples > 1: first_strip = r0 // rps last_strip = min((r1 - 1) // rps, strips_per_band - 1) - for band_idx in range(samples): band_offset = band_idx * strips_per_band for strip_idx in range(first_strip, last_strip + 1): global_idx = band_offset + strip_idx if byte_counts[global_idx] == 0: - # Sparse strip: result is already pre-filled. continue strip_row = strip_idx * rps strip_rows = min(rps, height - strip_row) if strip_rows <= 0: continue - - strip_data = data[offsets[global_idx]:offsets[global_idx] + byte_counts[global_idx]] - strip_pixels = _decode_strip_or_tile( - strip_data, compression, width, strip_rows, 1, - bps, bytes_per_sample, is_sub_byte, dtype, pred, - byte_order=header.byte_order, - jpeg_tables=jpeg_tables, - masked_fill=masked_fill) - - src_r0 = max(r0 - strip_row, 0) - src_r1 = min(r1 - strip_row, strip_rows) - dst_r0 = max(strip_row - r0, 0) - dst_r1 = dst_r0 + (src_r1 - src_r0) - if dst_r1 > dst_r0: - result[dst_r0:dst_r1, :, band_idx] = strip_pixels[src_r0:src_r1, c0:c1] + strip_jobs.append((band_idx, strip_idx, global_idx)) + strip_samples = 1 else: first_strip = r0 // rps last_strip = min((r1 - 1) // rps, len(offsets) - 1) - for strip_idx in range(first_strip, last_strip + 1): strip_row = strip_idx * rps strip_rows = min(rps, height - strip_row) if strip_rows <= 0: continue if byte_counts[strip_idx] == 0: - # Sparse strip: result is already pre-filled. continue + strip_jobs.append((0, strip_idx, strip_idx)) + strip_samples = samples + + def _decode_strip_job(job): + _band_idx, strip_idx, global_idx = job + strip_row = strip_idx * rps + strip_rows = min(rps, height - strip_row) + strip_data = data[ + offsets[global_idx]:offsets[global_idx] + byte_counts[global_idx]] + return _decode_strip_or_tile( + strip_data, compression, width, strip_rows, strip_samples, + bps, bytes_per_sample, is_sub_byte, dtype, pred, + byte_order=header.byte_order, + jpeg_tables=jpeg_tables, + masked_fill=masked_fill) - strip_data = data[offsets[strip_idx]:offsets[strip_idx] + byte_counts[strip_idx]] - strip_pixels = _decode_strip_or_tile( - strip_data, compression, width, strip_rows, samples, - bps, bytes_per_sample, is_sub_byte, dtype, pred, - byte_order=header.byte_order, - jpeg_tables=jpeg_tables, - masked_fill=masked_fill) - - src_r0 = max(r0 - strip_row, 0) - src_r1 = min(r1 - strip_row, strip_rows) - dst_r0 = max(strip_row - r0, 0) - dst_r1 = dst_r0 + (src_r1 - src_r0) - if dst_r1 > dst_r0: + n_strips = len(strip_jobs) + strip_pixel_count = width * rps + use_parallel = (n_strips > 1 + and strip_pixel_count >= _PARALLEL_DECODE_PIXEL_THRESHOLD) + if use_parallel: + n_workers = min(n_strips, _os_module.cpu_count() or 4) + with ThreadPoolExecutor(max_workers=n_workers) as pool: + decoded_strips = list(pool.map(_decode_strip_job, strip_jobs)) + else: + decoded_strips = [_decode_strip_job(job) for job in strip_jobs] + + for (band_idx, strip_idx, _global_idx), strip_pixels in zip( + strip_jobs, decoded_strips): + strip_row = strip_idx * rps + strip_rows = min(rps, height - strip_row) + src_r0 = max(r0 - strip_row, 0) + src_r1 = min(r1 - strip_row, strip_rows) + dst_r0 = max(strip_row - r0, 0) + dst_r1 = dst_r0 + (src_r1 - src_r0) + if dst_r1 > dst_r0: + if planar == 2 and samples > 1: + result[dst_r0:dst_r1, :, band_idx] = ( + strip_pixels[src_r0:src_r1, c0:c1]) + else: result[dst_r0:dst_r1] = strip_pixels[src_r0:src_r1, c0:c1] return result @@ -2667,7 +2683,51 @@ def _fetch_decode_cog_http_strips( strip_bytes_list = [] # Pass 3: decode each strip and place its intersection with the window. - for (band_idx, strip_idx), strip_data in zip(placements, strip_bytes_list): + # + # Codec decode (deflate, zstd, LZW, ...) releases the GIL inside the C + # extension, so threading the per-strip decode overlaps codec work + # across cores. The local tile / strip paths and the HTTP tile path + # use the same ``_PARALLEL_DECODE_PIXEL_THRESHOLD`` gate; mirror it + # here so HTTP COG strip reads of wide windows benefit from the same + # parallelism rather than serialising the decode after a parallel + # fetch. The placement loop that copies pixels into ``result`` stays + # serial to avoid contending writes to the output buffer. Issue #2100. + n_decode_strips = len(strip_bytes_list) + strip_pixel_count = width * rps + decode_in_parallel = ( + n_decode_strips > 1 + and strip_pixel_count >= _PARALLEL_DECODE_PIXEL_THRESHOLD) + + def _decode_http_strip(args): + strip_idx, strip_data = args + strip_row = strip_idx * rps + strip_rows = min(rps, height - strip_row) + if strip_rows <= 0: + return None + # Per-strip decoded-dimension cap (#1851). See note below. + _check_dimensions(width, strip_rows, strip_samples, + MAX_PIXELS_DEFAULT) + return _decode_strip_or_tile( + strip_data, compression, width, strip_rows, strip_samples, + bps, bytes_per_sample, is_sub_byte, dtype, pred, + byte_order=header.byte_order, + jpeg_tables=jpeg_tables, + masked_fill=masked_fill) + + decode_inputs = [(strip_idx, strip_data) + for (_band, strip_idx), strip_data + in zip(placements, strip_bytes_list)] + + if decode_in_parallel: + n_decode_workers = min(n_decode_strips, _os_module.cpu_count() or 4) + with ThreadPoolExecutor(max_workers=n_decode_workers) as pool: + decoded_strips = list(pool.map(_decode_http_strip, decode_inputs)) + else: + decoded_strips = [_decode_http_strip(item) for item in decode_inputs] + + for (band_idx, strip_idx), strip_pixels in zip(placements, decoded_strips): + if strip_pixels is None: + continue strip_row = strip_idx * rps strip_rows = min(rps, height - strip_row) if strip_rows <= 0: @@ -2681,15 +2741,7 @@ def _fetch_decode_cog_http_strips( # the window clip. Use ``MAX_PIXELS_DEFAULT`` rather than the # caller's ``max_pixels`` so a small output-window budget does # not reject normal strip sizes. - _check_dimensions(width, strip_rows, strip_samples, - MAX_PIXELS_DEFAULT) - - strip_pixels = _decode_strip_or_tile( - strip_data, compression, width, strip_rows, strip_samples, - bps, bytes_per_sample, is_sub_byte, dtype, pred, - byte_order=header.byte_order, - jpeg_tables=jpeg_tables, - masked_fill=masked_fill) + # The decoded-dimension cap fired inside ``_decode_http_strip``. src_r0 = max(r0 - strip_row, 0) src_r1 = min(r1 - strip_row, strip_rows) diff --git a/xrspatial/geotiff/tests/test_parallel_strip_decode_2100.py b/xrspatial/geotiff/tests/test_parallel_strip_decode_2100.py new file mode 100644 index 00000000..4b6c4758 --- /dev/null +++ b/xrspatial/geotiff/tests/test_parallel_strip_decode_2100.py @@ -0,0 +1,367 @@ +"""Tests for parallel strip decode in ``_read_strips`` and +``_fetch_decode_cog_http_strips``. + +Issue #2100. Both strip paths previously decoded strips sequentially in a +Python for-loop. The matching tile paths use a ``ThreadPoolExecutor`` +gated on ``_PARALLEL_DECODE_PIXEL_THRESHOLD`` (64K pixels). These tests +codify the gate and verify correctness (parallel vs serial parity). +""" +from __future__ import annotations + +import http.server +import os +import socket +import tempfile +import threading +from unittest.mock import patch + +import numpy as np +import pytest +import xarray as xr + +from xrspatial.geotiff import to_geotiff +from xrspatial.geotiff import _reader as _reader_mod +from xrspatial.geotiff._reader import read_to_array + + +def _make_stripped_uint16(height: int, width: int, *, + compression: str = "deflate") -> bytes: + """Build a stripped TIFF in memory and return the bytes.""" + rng = np.random.default_rng(seed=12345) + arr = rng.integers(0, 256, size=(height, width), dtype=np.uint16) + da = xr.DataArray(arr, dims=["y", "x"]) + with tempfile.NamedTemporaryFile(suffix=".tif", delete=False) as f: + path = f.name + try: + to_geotiff(da, path, compression=compression, tiled=False) + with open(path, "rb") as f: + return arr, f.read() + finally: + try: + os.remove(path) + except OSError: + pass + + +class TestReadStripsParallelGate: + """Local strip path: the parallel-decode gate engages on multi-strip.""" + + def test_parallel_decode_matches_serial(self, tmp_path): + """A wide stripped TIFF: parallel and serial decode return + identical bytes. + + Uses pytest's ``tmp_path`` rather than + ``tempfile.TemporaryDirectory()`` because ``read_to_array`` + leaves the mmap entry cached on close (see ``_MmapCache``); + on Windows the cached mmap holds the file handle, so an + eager ``TemporaryDirectory.__exit__`` rmtree races the mmap + and raises ``WinError 32``. ``tmp_path`` defers cleanup to + pytest's session teardown, which tolerates that race. + """ + arr, blob = _make_stripped_uint16(2048, 4096) + p = str(tmp_path / "s.tif") + with open(p, "wb") as f: + f.write(blob) + par, _ = read_to_array(p) + with patch.object(_reader_mod, + "_PARALLEL_DECODE_PIXEL_THRESHOLD", 10**12): + ser, _ = read_to_array(p) + np.testing.assert_array_equal(par, ser) + np.testing.assert_array_equal(par, arr) + + def test_parallel_pool_engages_on_multi_strip(self, tmp_path): + """Confirm the parallel branch is taken: patch ThreadPoolExecutor + and assert it is constructed when n_strips > 1 and strip pixels + clear the gate.""" + arr, blob = _make_stripped_uint16(1024, 2048) + p = str(tmp_path / "s.tif") + with open(p, "wb") as f: + f.write(blob) + with patch.object(_reader_mod, "ThreadPoolExecutor", + wraps=_reader_mod.ThreadPoolExecutor) as mock_pool: + out, _ = read_to_array(p) + # n_strips for a 1024-row file with default rps -> at + # least 4 strips (TIFFs default rps=8KB / row). + assert mock_pool.called + np.testing.assert_array_equal(out, arr) + + def test_serial_path_used_for_small_strip(self, tmp_path): + """A single-row image will produce a single strip; the parallel + gate must short-circuit to the serial branch.""" + arr = np.arange(2 * 32, dtype=np.uint16).reshape(2, 32) + da = xr.DataArray(arr, dims=["y", "x"]) + p = str(tmp_path / "tiny.tif") + to_geotiff(da, p, compression="deflate", tiled=False) + with patch.object(_reader_mod, "ThreadPoolExecutor", + wraps=_reader_mod.ThreadPoolExecutor) as mock_pool: + out, _ = read_to_array(p) + # Single-strip file => no pool. + assert not mock_pool.called + np.testing.assert_array_equal(out, arr) + + def test_windowed_strip_read_parallel(self, tmp_path): + """A windowed read across multiple strips still matches the full + decode.""" + arr, blob = _make_stripped_uint16(2048, 2048) + p = str(tmp_path / "w.tif") + with open(p, "wb") as f: + f.write(blob) + par, _ = read_to_array(p, window=(100, 100, 1500, 1500)) + with patch.object(_reader_mod, + "_PARALLEL_DECODE_PIXEL_THRESHOLD", 10**12): + ser, _ = read_to_array(p, window=(100, 100, 1500, 1500)) + np.testing.assert_array_equal(par, ser) + np.testing.assert_array_equal(par, arr[100:1500, 100:1500]) + + +# ---------------------------------------------------------------------- +# HTTP COG strip path +# ---------------------------------------------------------------------- + +class _RangeHandler(http.server.BaseHTTPRequestHandler): + """Serve a fixed byte blob with HTTP Range support.""" + blob: bytes = b"" + + def do_HEAD(self): + self.send_response(200) + self.send_header("Content-Length", str(len(self.blob))) + self.send_header("Accept-Ranges", "bytes") + self.end_headers() + + def do_GET(self): + rng = self.headers.get("Range") + if rng and rng.startswith("bytes="): + r0, r1 = rng[len("bytes="):].split("-") + r0 = int(r0) + r1 = int(r1) if r1 else len(self.blob) - 1 + r1 = min(r1, len(self.blob) - 1) + body = self.blob[r0:r1 + 1] + self.send_response(206) + self.send_header("Content-Range", + f"bytes {r0}-{r1}/{len(self.blob)}") + self.send_header("Content-Length", str(len(body))) + self.send_header("Accept-Ranges", "bytes") + self.end_headers() + self.wfile.write(body) + else: + self.send_response(200) + self.send_header("Content-Length", str(len(self.blob))) + self.send_header("Accept-Ranges", "bytes") + self.end_headers() + self.wfile.write(self.blob) + + def log_message(self, format, *args): # quiet + return + + +def _start_server(blob: bytes): + """Start an HTTP server serving the given blob on a free port.""" + s = socket.socket() + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + handler = type( + "BlobHandler", (_RangeHandler,), {"blob": blob}) + server = http.server.HTTPServer(("127.0.0.1", port), handler) + th = threading.Thread(target=server.serve_forever, daemon=True) + th.start() + return server, port + + +class TestHttpStripParallelDecode: + def test_parallel_decode_matches_serial(self, monkeypatch): + monkeypatch.setenv("XRSPATIAL_GEOTIFF_ALLOW_PRIVATE_HOSTS", "1") + arr, blob = _make_stripped_uint16(1024, 2048, compression="deflate") + server, port = _start_server(blob) + try: + url = f"http://127.0.0.1:{port}/s.tif" + par, _ = read_to_array(url, window=(0, 0, 1024, 2048)) + with patch.object(_reader_mod, + "_PARALLEL_DECODE_PIXEL_THRESHOLD", 10**12): + ser, _ = read_to_array(url, window=(0, 0, 1024, 2048)) + finally: + server.shutdown() + np.testing.assert_array_equal(par, ser) + np.testing.assert_array_equal(par, arr) + + def test_serial_gate_engages_on_single_strip(self, monkeypatch): + """HTTP windowed-strip path with a single-strip source: the + gate at ``_fetch_decode_cog_http_strips`` (n_decode_strips > 1) + must short-circuit to the serial branch. + + ``ThreadPoolExecutor`` is also used by ``read_ranges`` to fan + out fetches, but for a single-range fetch that path takes the + ``len(ranges) == 1`` short-circuit and never instantiates a + pool. So spying on ``_decode_strip_or_tile`` and asserting the + thread identity catches a regression that fires the pool when + it shouldn't, without false positives from the fetch layer. + """ + monkeypatch.setenv("XRSPATIAL_GEOTIFF_ALLOW_PRIVATE_HOSTS", "1") + # 2x32 uint16 → 1 strip with the writer's default rps. + arr = np.arange(2 * 32, dtype=np.uint16).reshape(2, 32) + da = xr.DataArray(arr, dims=["y", "x"]) + with tempfile.NamedTemporaryFile(suffix=".tif", + delete=False) as f: + tif_path = f.name + try: + to_geotiff(da, tif_path, compression="deflate", + tiled=False) + with open(tif_path, "rb") as f: + blob = f.read() + finally: + try: + os.remove(tif_path) + except OSError: + pass + + threads_seen: list[int] = [] + real_decode = _reader_mod._decode_strip_or_tile + + def _spy(*args, **kwargs): + threads_seen.append(threading.get_ident()) + return real_decode(*args, **kwargs) + + monkeypatch.setattr( + _reader_mod, "_decode_strip_or_tile", _spy) + + server, port = _start_server(blob) + try: + url = f"http://127.0.0.1:{port}/tiny.tif" + out, _ = read_to_array(url, window=(0, 0, 2, 32)) + finally: + server.shutdown() + + main_tid = threading.get_ident() + assert threads_seen, "_decode_strip_or_tile was never called" + assert all(tid == main_tid for tid in threads_seen), ( + f"decode ran in worker thread(s) " + f"{set(threads_seen) - {main_tid}}; main is {main_tid}. " + f"The serial gate failed to short-circuit on a " + f"single-strip HTTP read." + ) + np.testing.assert_array_equal(out, arr) + + +# ---------------------------------------------------------------------- +# Planar=2 multi-band stripped TIFF +# ---------------------------------------------------------------------- + + +class TestPlanar2MultibandStripParallel: + """Planar=2 multi-band stripped TIFF. + + ``_read_strips`` has a separate ``planar == 2 and samples > 1`` + branch in the strip-job collection loop (xrspatial/geotiff/ + _reader.py:1949-1963). The parallel pool itself is planar-agnostic, + so a regression in band-ordering or per-band strip indexing would + survive the chunky tests above; this class covers that branch. + """ + + def test_parallel_matches_serial_planar2(self, tmp_path): + """Parallel and serial decode produce bit-identical output on + a planar=2 multi-band stripped TIFF.""" + tifffile = pytest.importorskip("tifffile") + rng = np.random.default_rng(seed=20260518) + bands, height, width = 3, 1024, 1024 + arr = rng.integers( + 0, 1000, size=(bands, height, width), dtype=np.uint16) + p = str(tmp_path / "planar2.tif") + tifffile.imwrite( + p, arr, + photometric="rgb", + planarconfig="separate", + compression="deflate", + rowsperstrip=128, + ) + + par, _ = read_to_array(p) + with patch.object( + _reader_mod, "_PARALLEL_DECODE_PIXEL_THRESHOLD", + 10 ** 12): + ser, _ = read_to_array(p) + + np.testing.assert_array_equal(par, ser) + # The reader returns (height, width, samples); the fixture is + # (bands, height, width). Reorder for comparison. + np.testing.assert_array_equal(par, np.moveaxis(arr, 0, -1)) + + def test_windowed_planar2_parallel(self, tmp_path): + """A window across multiple strips on a planar=2 multi-band + stripped TIFF still matches the slice of the full decode.""" + tifffile = pytest.importorskip("tifffile") + rng = np.random.default_rng(seed=20260519) + bands, height, width = 3, 1024, 1024 + arr = rng.integers( + 0, 1000, size=(bands, height, width), dtype=np.uint16) + p = str(tmp_path / "planar2_win.tif") + tifffile.imwrite( + p, arr, + photometric="rgb", + planarconfig="separate", + compression="deflate", + rowsperstrip=128, + ) + + par, _ = read_to_array(p, window=(100, 100, 900, 900)) + with patch.object( + _reader_mod, "_PARALLEL_DECODE_PIXEL_THRESHOLD", + 10 ** 12): + ser, _ = read_to_array(p, window=(100, 100, 900, 900)) + + np.testing.assert_array_equal(par, ser) + expected = np.moveaxis(arr, 0, -1)[100:900, 100:900] + np.testing.assert_array_equal(par, expected) + + def test_http_windowed_planar2_parallel(self, monkeypatch): + """HTTP windowed strip path on planar=2 multi-band: pins the + per-band strip-job loop inside + ``_fetch_decode_cog_http_strips`` (the local path's planar=2 + coverage above does not reach the HTTP fetch+decode branch + because full-image HTTP reads dispatch back to + ``_read_strips``).""" + tifffile = pytest.importorskip("tifffile") + monkeypatch.setenv( + "XRSPATIAL_GEOTIFF_ALLOW_PRIVATE_HOSTS", "1") + rng = np.random.default_rng(seed=20260520) + bands, height, width = 3, 1024, 1024 + arr = rng.integers( + 0, 1000, size=(bands, height, width), dtype=np.uint16) + with tempfile.NamedTemporaryFile( + suffix=".tif", delete=False) as f: + tif_path = f.name + try: + tifffile.imwrite( + tif_path, arr, + photometric="rgb", + planarconfig="separate", + compression="deflate", + rowsperstrip=128, + ) + with open(tif_path, "rb") as f: + blob = f.read() + finally: + try: + os.remove(tif_path) + except OSError: + pass + + server, port = _start_server(blob) + try: + url = f"http://127.0.0.1:{port}/planar2.tif" + par, _ = read_to_array( + url, window=(100, 100, 900, 900)) + with patch.object( + _reader_mod, "_PARALLEL_DECODE_PIXEL_THRESHOLD", + 10 ** 12): + ser, _ = read_to_array( + url, window=(100, 100, 900, 900)) + finally: + server.shutdown() + + np.testing.assert_array_equal(par, ser) + expected = np.moveaxis(arr, 0, -1)[100:900, 100:900] + np.testing.assert_array_equal(par, expected) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])