From e127471921be17b04d83f53d64454c0f78ffa068 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Tue, 12 May 2026 16:43:42 -0700 Subject: [PATCH 1/2] geotiff: route _read_geo_info through _CloudSource for fsspec URIs (#1749) read_geotiff_dask failed on s3://, gs://, az://, memory:// and other fsspec URIs because the metadata-only step (_read_geo_info) used a plain open(source, 'rb') call. The eager path already handled cloud URIs via _read_to_array + _CloudSource, so the dask graph's per-chunk pixel reads were already cloud-aware; only the upfront metadata read broke. Detect fsspec URIs in _read_geo_info and pull the file bytes via _CloudSource.read_all(). Local-path mmap fast path is unchanged. HTTP path is unchanged and continues to use _parse_cog_http_meta. Closes #1749. --- xrspatial/geotiff/__init__.py | 13 +++++++++- xrspatial/geotiff/tests/test_features.py | 33 ++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/xrspatial/geotiff/__init__.py b/xrspatial/geotiff/__init__.py index feffe706..f212be14 100644 --- a/xrspatial/geotiff/__init__.py +++ b/xrspatial/geotiff/__init__.py @@ -460,7 +460,7 @@ def _read_geo_info(source, *, overview_level: int | None = None): from ._dtypes import resolve_bits_per_sample, tiff_dtype_to_numpy from ._geotags import extract_geo_info_with_overview_inheritance from ._header import parse_all_ifds, parse_header, select_overview_ifd - from ._reader import _coerce_path, _is_file_like + from ._reader import _CloudSource, _coerce_path, _is_file_like, _is_fsspec_uri source = _coerce_path(source) if _is_file_like(source): @@ -477,6 +477,17 @@ def _read_geo_info(source, *, overview_level: int | None = None): except (OSError, AttributeError): pass close_data = False + elif isinstance(source, str) and _is_fsspec_uri(source): + # fsspec URI (s3://, gs://, az://, memory://, ...): pull the + # whole file via _CloudSource for metadata parsing. Per-chunk + # pixel reads in the dask graph go through _read_to_array + # which opens its own _CloudSource, so this fetch is metadata-only. + _src = _CloudSource(source) + try: + data = _src.read_all() + finally: + _src.close() + close_data = False elif isinstance(source, str): with open(source, 'rb') as f: import mmap diff --git a/xrspatial/geotiff/tests/test_features.py b/xrspatial/geotiff/tests/test_features.py index 19863785..706e8b95 100644 --- a/xrspatial/geotiff/tests/test_features.py +++ b/xrspatial/geotiff/tests/test_features.py @@ -1051,6 +1051,39 @@ def test_memory_filesystem_full_roundtrip(self, tmp_path): fs.rm('/roundtrip.tif') + def test_dask_path_fsspec_uri_1749(self, tmp_path): + """read_geotiff_dask supports fsspec URIs (issue #1749). + + The eager path already routed through _CloudSource via + _read_to_array. The dask path's _read_geo_info used plain + open(), which failed on memory://, s3://, etc. + """ + pytest.importorskip('fsspec') + import fsspec + + arr = np.arange(64, dtype=np.float32).reshape(8, 8) + + local_path = str(tmp_path / 'src.tif') + to_geotiff(arr, local_path, compression='none') + with open(local_path, 'rb') as f: + tiff_bytes = f.read() + + fs = fsspec.filesystem('memory') + fs.pipe('/dask_1749.tif', tiff_bytes) + + try: + eager = open_geotiff('memory:///dask_1749.tif') + lazy = open_geotiff('memory:///dask_1749.tif', chunks=4) + + # Lazy path is dask-backed + import dask.array as da + assert isinstance(lazy.data, da.Array) + + np.testing.assert_array_equal(lazy.values, eager.values) + np.testing.assert_array_equal(lazy.values, arr) + finally: + fs.rm('/dask_1749.tif') + def test_writer_cloud_scheme_detection(self): """Writer detects cloud schemes.""" from xrspatial.geotiff._writer import _is_fsspec_uri From 16b76eabc6c2a7ef854f3ae55c650f69d499e7d4 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Wed, 13 May 2026 03:57:59 -0700 Subject: [PATCH 2/2] geotiff: bound fsspec metadata reads via range prefetch The prior commit added an fsspec branch in _read_geo_info that called _CloudSource.read_all() to parse metadata. For a large COG on S3 that pulls the whole object into memory just to learn its shape/transform, which defeats the O(1) memory intent of _read_geo_info. Route fsspec URIs through _parse_cog_http_meta (same path as HTTP COGs). It only requires a read_range-having source, which _CloudSource satisfies, and grows a bounded buffer (capped at MAX_HTTP_HEADER_BYTES) until the IFD chain resolves. Applies to both _read_geo_info itself (used by the .raster accessor) and the read_geotiff_dask metadata prefetch. Per-chunk tile reads in the dask graph now also dispatch through _fetch_decode_cog_http_tiles with a _CloudSource instead of falling back to read_to_array's read_all path. Adds read_ranges and read_ranges_coalesced methods to _CloudSource so the tiled COG decode path can drive a cloud source the same way it drives an HTTP source. Extends the regression test to assert _CloudSource.read_all is not called during dask graph construction or chunk materialisation. Addresses PR #1755 review comment. --- xrspatial/geotiff/__init__.py | 76 ++++++++++++++++++------ xrspatial/geotiff/_reader.py | 49 +++++++++++++++ xrspatial/geotiff/tests/test_features.py | 49 +++++++++++++-- 3 files changed, 152 insertions(+), 22 deletions(-) diff --git a/xrspatial/geotiff/__init__.py b/xrspatial/geotiff/__init__.py index f212be14..04fa6bd5 100644 --- a/xrspatial/geotiff/__init__.py +++ b/xrspatial/geotiff/__init__.py @@ -460,9 +460,33 @@ def _read_geo_info(source, *, overview_level: int | None = None): from ._dtypes import resolve_bits_per_sample, tiff_dtype_to_numpy from ._geotags import extract_geo_info_with_overview_inheritance from ._header import parse_all_ifds, parse_header, select_overview_ifd - from ._reader import _CloudSource, _coerce_path, _is_file_like, _is_fsspec_uri + from ._reader import ( + _CloudSource, _coerce_path, _is_file_like, _is_fsspec_uri, + _parse_cog_http_meta, + ) source = _coerce_path(source) + if isinstance(source, str) and _is_fsspec_uri(source): + # fsspec URI (s3://, gs://, az://, memory://, ...): use the + # bounded-prefetch metadata parser instead of downloading the + # full remote object. ``_parse_cog_http_meta`` only needs + # ``read_range`` on the source, which ``_CloudSource`` provides; + # it grows a small range buffer until the IFD chain resolves + # (capped by ``MAX_HTTP_HEADER_BYTES``). Avoids the + # whole-file fetch that would otherwise happen on every + # ``open_geotiff(..., chunks=...)`` graph build for a large COG. + _src = _CloudSource(source) + try: + _header, _ifd, geo_info, _ = _parse_cog_http_meta( + _src, overview_level=overview_level) + finally: + _src.close() + bps = resolve_bits_per_sample(_ifd.bits_per_sample) + file_dtype = tiff_dtype_to_numpy(bps, _ifd.sample_format) + n_bands = ( + _ifd.samples_per_pixel if _ifd.samples_per_pixel > 1 else 0 + ) + return geo_info, _ifd.height, _ifd.width, file_dtype, n_bands if _is_file_like(source): # File-like: read its full bytes; we don't try to mmap arbitrary # buffers because they may not back a real file descriptor. @@ -477,17 +501,6 @@ def _read_geo_info(source, *, overview_level: int | None = None): except (OSError, AttributeError): pass close_data = False - elif isinstance(source, str) and _is_fsspec_uri(source): - # fsspec URI (s3://, gs://, az://, memory://, ...): pull the - # whole file via _CloudSource for metadata parsing. Per-chunk - # pixel reads in the dask graph go through _read_to_array - # which opens its own _CloudSource, so this fetch is metadata-only. - _src = _CloudSource(source) - try: - data = _src.read_all() - finally: - _src.close() - close_data = False elif isinstance(source, str): with open(source, 'rb') as f: import mmap @@ -1929,16 +1942,28 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, # P5: HTTP COG sources used to fire one IFD/header GET per chunk # task. Parse metadata once here so every delayed task can reuse it. + # The same prefetch path also covers fsspec URIs (s3://, gs://, ...); + # ``_parse_cog_http_meta`` only needs a ``read_range``-having source, + # and ``_CloudSource`` satisfies that contract. Going through it + # bounds metadata reads to ``MAX_HTTP_HEADER_BYTES`` instead of + # fetching the whole remote object up front. See PR #1755 review. is_http = ( isinstance(source, str) and source.startswith(('http://', 'https://')) ) + from ._reader import _is_fsspec_uri + is_fsspec = isinstance(source, str) and _is_fsspec_uri(source) http_meta = None http_meta_key = None - if is_http: + if is_http or is_fsspec: import dask - from ._reader import _HTTPSource, _parse_cog_http_meta - _src = _HTTPSource(source) + from ._reader import _parse_cog_http_meta + if is_http: + from ._reader import _HTTPSource + _src = _HTTPSource(source) + else: + from ._reader import _CloudSource + _src = _CloudSource(source) try: http_header, http_ifd, http_geo, _ = _parse_cog_http_meta( _src, overview_level=overview_level) @@ -2166,14 +2191,29 @@ def _delayed_read_window(source, r0, c0, r1, c1, overview_level, nodata, @dask.delayed def _read(http_meta): + # The prefetched-metadata fast path covers both HTTP COGs and + # fsspec-addressable remotes (s3://, gs://, az://, memory://, ...). + # Both source classes expose ``read_range``, which is all + # ``_fetch_decode_cog_http_tiles`` needs. + _is_http_src = isinstance(source, str) and source.startswith( + ('http://', 'https://')) + _is_fsspec_src = False if http_meta is not None and isinstance(source, str) and \ - source.startswith(('http://', 'https://')): + not _is_http_src: + from ._reader import _is_fsspec_uri as _ifs + _is_fsspec_src = _ifs(source) + if http_meta is not None and (_is_http_src or _is_fsspec_src): from ._reader import ( - _HTTPSource, _fetch_decode_cog_http_tiles, + _fetch_decode_cog_http_tiles, MAX_PIXELS_DEFAULT, ) header, ifd = http_meta - src = _HTTPSource(source) + if _is_http_src: + from ._reader import _HTTPSource + src = _HTTPSource(source) + else: + from ._reader import _CloudSource + src = _CloudSource(source) try: arr = _fetch_decode_cog_http_tiles( src, header, ifd, diff --git a/xrspatial/geotiff/_reader.py b/xrspatial/geotiff/_reader.py index e76ee1c4..b97d6321 100644 --- a/xrspatial/geotiff/_reader.py +++ b/xrspatial/geotiff/_reader.py @@ -1002,6 +1002,55 @@ def read_range(self, start: int, length: int) -> bytes: f.seek(start) return f.read(length) + def read_ranges( + self, + ranges: list[tuple[int, int]], + max_workers: int = 8, + ) -> list[bytes]: + """Fetch multiple ranges concurrently using a thread pool. + + Mirrors :meth:`_HTTPSource.read_ranges` so that + :func:`_fetch_decode_cog_http_tiles` can drive a cloud source + the same way it drives an HTTP source. See PR #1755. + """ + if not ranges: + return [] + if len(ranges) == 1: + start, length = ranges[0] + return [self.read_range(start, length)] + + workers = min(max_workers, len(ranges)) + results: list[bytes | None] = [None] * len(ranges) + + with ThreadPoolExecutor(max_workers=workers) as ex: + future_to_idx = { + ex.submit(self.read_range, start, length): i + for i, (start, length) in enumerate(ranges) + } + for fut in future_to_idx: + idx = future_to_idx[fut] + results[idx] = fut.result() + + return results # type: ignore[return-value] + + def read_ranges_coalesced( + self, + ranges: list[tuple[int, int]], + max_workers: int = 8, + gap_threshold: int = COALESCE_GAP_THRESHOLD_DEFAULT, + ) -> list[bytes]: + """Fetch *ranges* using merged GETs where adjacent ranges allow it. + + Mirrors :meth:`_HTTPSource.read_ranges_coalesced` so the tiled + COG decode path can coalesce neighbouring tiles when reading + from object storage. + """ + if not ranges: + return [] + merged, mapping = coalesce_ranges(ranges, gap_threshold=gap_threshold) + merged_bytes = self.read_ranges(merged, max_workers=max_workers) + return split_coalesced_bytes(merged_bytes, mapping) + def read_all(self) -> bytes: with self._fs.open(self._path, 'rb') as f: return f.read() diff --git a/xrspatial/geotiff/tests/test_features.py b/xrspatial/geotiff/tests/test_features.py index 706e8b95..29318a1d 100644 --- a/xrspatial/geotiff/tests/test_features.py +++ b/xrspatial/geotiff/tests/test_features.py @@ -1069,11 +1069,11 @@ def test_dask_path_fsspec_uri_1749(self, tmp_path): tiff_bytes = f.read() fs = fsspec.filesystem('memory') - fs.pipe('/dask_1749.tif', tiff_bytes) + fs.pipe('/dask_1749_full.tif', tiff_bytes) try: - eager = open_geotiff('memory:///dask_1749.tif') - lazy = open_geotiff('memory:///dask_1749.tif', chunks=4) + eager = open_geotiff('memory:///dask_1749_full.tif') + lazy = open_geotiff('memory:///dask_1749_full.tif', chunks=4) # Lazy path is dask-backed import dask.array as da @@ -1082,7 +1082,48 @@ def test_dask_path_fsspec_uri_1749(self, tmp_path): np.testing.assert_array_equal(lazy.values, eager.values) np.testing.assert_array_equal(lazy.values, arr) finally: - fs.rm('/dask_1749.tif') + fs.rm('/dask_1749_full.tif') + + def test_dask_path_fsspec_uri_no_full_download_1749(self, tmp_path, + monkeypatch): + """Dask graph build for fsspec URIs must not pull the whole file. + + ``_read_geo_info`` previously called ``_CloudSource.read_all`` to + parse metadata. For a large COG on S3 that downloads the whole + object just to learn its shape/transform. The fix routes fsspec + sources through ``_parse_cog_http_meta``, which only uses + ``read_range``. Guard against regression by failing the test if + ``read_all`` runs during ``open_geotiff(..., chunks=...)``. See + PR #1755 review. + """ + pytest.importorskip('fsspec') + import fsspec + + arr = np.arange(64, dtype=np.float32).reshape(8, 8) + + local_path = str(tmp_path / 'src.tif') + to_geotiff(arr, local_path, compression='none') + with open(local_path, 'rb') as f: + tiff_bytes = f.read() + + fs = fsspec.filesystem('memory') + fs.pipe('/dask_1749_nofull.tif', tiff_bytes) + + from xrspatial.geotiff import _reader as _reader_mod + + def _no_read_all(self): + raise AssertionError( + "_CloudSource.read_all called during dask graph build") + + monkeypatch.setattr( + _reader_mod._CloudSource, 'read_all', _no_read_all) + + try: + lazy = open_geotiff('memory:///dask_1749_nofull.tif', chunks=4) + # Materialise to confirm the chunk tasks also avoid read_all. + np.testing.assert_array_equal(lazy.values, arr) + finally: + fs.rm('/dask_1749_nofull.tif') def test_writer_cloud_scheme_detection(self): """Writer detects cloud schemes."""