Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions xrspatial/geotiff/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,33 @@ def _read_geo_info(source, *, overview_level: int | None = None):
from ._dtypes import resolve_bits_per_sample, tiff_dtype_to_numpy
from ._geotags import extract_geo_info_with_overview_inheritance
from ._header import parse_all_ifds, parse_header, select_overview_ifd
from ._reader import _coerce_path, _is_file_like
from ._reader import (
_CloudSource, _coerce_path, _is_file_like, _is_fsspec_uri,
_parse_cog_http_meta,
)

source = _coerce_path(source)
if isinstance(source, str) and _is_fsspec_uri(source):
# fsspec URI (s3://, gs://, az://, memory://, ...): use the
# bounded-prefetch metadata parser instead of downloading the
# full remote object. ``_parse_cog_http_meta`` only needs
# ``read_range`` on the source, which ``_CloudSource`` provides;
# it grows a small range buffer until the IFD chain resolves
# (capped by ``MAX_HTTP_HEADER_BYTES``). Avoids the
# whole-file fetch that would otherwise happen on every
# ``open_geotiff(..., chunks=...)`` graph build for a large COG.
_src = _CloudSource(source)
try:
_header, _ifd, geo_info, _ = _parse_cog_http_meta(
_src, overview_level=overview_level)
finally:
_src.close()
bps = resolve_bits_per_sample(_ifd.bits_per_sample)
file_dtype = tiff_dtype_to_numpy(bps, _ifd.sample_format)
n_bands = (
_ifd.samples_per_pixel if _ifd.samples_per_pixel > 1 else 0
)
return geo_info, _ifd.height, _ifd.width, file_dtype, n_bands
if _is_file_like(source):
# File-like: read its full bytes; we don't try to mmap arbitrary
# buffers because they may not back a real file descriptor.
Expand Down Expand Up @@ -1918,16 +1942,28 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512,

# P5: HTTP COG sources used to fire one IFD/header GET per chunk
# task. Parse metadata once here so every delayed task can reuse it.
# The same prefetch path also covers fsspec URIs (s3://, gs://, ...);
# ``_parse_cog_http_meta`` only needs a ``read_range``-having source,
# and ``_CloudSource`` satisfies that contract. Going through it
# bounds metadata reads to ``MAX_HTTP_HEADER_BYTES`` instead of
# fetching the whole remote object up front. See PR #1755 review.
is_http = (
isinstance(source, str)
and source.startswith(('http://', 'https://'))
)
from ._reader import _is_fsspec_uri
is_fsspec = isinstance(source, str) and _is_fsspec_uri(source)
http_meta = None
http_meta_key = None
if is_http:
if is_http or is_fsspec:
import dask
from ._reader import _HTTPSource, _parse_cog_http_meta
_src = _HTTPSource(source)
from ._reader import _parse_cog_http_meta
if is_http:
from ._reader import _HTTPSource
_src = _HTTPSource(source)
else:
from ._reader import _CloudSource
_src = _CloudSource(source)
try:
http_header, http_ifd, http_geo, _ = _parse_cog_http_meta(
_src, overview_level=overview_level)
Expand Down Expand Up @@ -2155,14 +2191,29 @@ def _delayed_read_window(source, r0, c0, r1, c1, overview_level, nodata,

@dask.delayed
def _read(http_meta):
# The prefetched-metadata fast path covers both HTTP COGs and
# fsspec-addressable remotes (s3://, gs://, az://, memory://, ...).
# Both source classes expose ``read_range``, which is all
# ``_fetch_decode_cog_http_tiles`` needs.
_is_http_src = isinstance(source, str) and source.startswith(
('http://', 'https://'))
_is_fsspec_src = False
if http_meta is not None and isinstance(source, str) and \
source.startswith(('http://', 'https://')):
not _is_http_src:
from ._reader import _is_fsspec_uri as _ifs
_is_fsspec_src = _ifs(source)
if http_meta is not None and (_is_http_src or _is_fsspec_src):
from ._reader import (
_HTTPSource, _fetch_decode_cog_http_tiles,
_fetch_decode_cog_http_tiles,
MAX_PIXELS_DEFAULT,
)
header, ifd = http_meta
src = _HTTPSource(source)
if _is_http_src:
from ._reader import _HTTPSource
src = _HTTPSource(source)
else:
from ._reader import _CloudSource
src = _CloudSource(source)
try:
arr = _fetch_decode_cog_http_tiles(
src, header, ifd,
Expand Down
49 changes: 49 additions & 0 deletions xrspatial/geotiff/_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,55 @@ def read_range(self, start: int, length: int) -> bytes:
f.seek(start)
return f.read(length)

def read_ranges(
self,
ranges: list[tuple[int, int]],
max_workers: int = 8,
) -> list[bytes]:
"""Fetch multiple ranges concurrently using a thread pool.

Mirrors :meth:`_HTTPSource.read_ranges` so that
:func:`_fetch_decode_cog_http_tiles` can drive a cloud source
the same way it drives an HTTP source. See PR #1755.
"""
if not ranges:
return []
if len(ranges) == 1:
start, length = ranges[0]
return [self.read_range(start, length)]

workers = min(max_workers, len(ranges))
results: list[bytes | None] = [None] * len(ranges)

with ThreadPoolExecutor(max_workers=workers) as ex:
future_to_idx = {
ex.submit(self.read_range, start, length): i
for i, (start, length) in enumerate(ranges)
}
for fut in future_to_idx:
idx = future_to_idx[fut]
results[idx] = fut.result()

return results # type: ignore[return-value]

def read_ranges_coalesced(
self,
ranges: list[tuple[int, int]],
max_workers: int = 8,
gap_threshold: int = COALESCE_GAP_THRESHOLD_DEFAULT,
) -> list[bytes]:
"""Fetch *ranges* using merged GETs where adjacent ranges allow it.

Mirrors :meth:`_HTTPSource.read_ranges_coalesced` so the tiled
COG decode path can coalesce neighbouring tiles when reading
from object storage.
"""
if not ranges:
return []
merged, mapping = coalesce_ranges(ranges, gap_threshold=gap_threshold)
merged_bytes = self.read_ranges(merged, max_workers=max_workers)
return split_coalesced_bytes(merged_bytes, mapping)

def read_all(self) -> bytes:
with self._fs.open(self._path, 'rb') as f:
return f.read()
Expand Down
74 changes: 74 additions & 0 deletions xrspatial/geotiff/tests/test_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,80 @@ def test_memory_filesystem_full_roundtrip(self, tmp_path):

fs.rm('/roundtrip.tif')

def test_dask_path_fsspec_uri_1749(self, tmp_path):
"""read_geotiff_dask supports fsspec URIs (issue #1749).

The eager path already routed through _CloudSource via
_read_to_array. The dask path's _read_geo_info used plain
open(), which failed on memory://, s3://, etc.
"""
pytest.importorskip('fsspec')
import fsspec

arr = np.arange(64, dtype=np.float32).reshape(8, 8)

local_path = str(tmp_path / 'src.tif')
to_geotiff(arr, local_path, compression='none')
with open(local_path, 'rb') as f:
tiff_bytes = f.read()

fs = fsspec.filesystem('memory')
fs.pipe('/dask_1749_full.tif', tiff_bytes)

try:
eager = open_geotiff('memory:///dask_1749_full.tif')
lazy = open_geotiff('memory:///dask_1749_full.tif', chunks=4)

# Lazy path is dask-backed
import dask.array as da
assert isinstance(lazy.data, da.Array)

np.testing.assert_array_equal(lazy.values, eager.values)
np.testing.assert_array_equal(lazy.values, arr)
finally:
fs.rm('/dask_1749_full.tif')

def test_dask_path_fsspec_uri_no_full_download_1749(self, tmp_path,
monkeypatch):
"""Dask graph build for fsspec URIs must not pull the whole file.

``_read_geo_info`` previously called ``_CloudSource.read_all`` to
parse metadata. For a large COG on S3 that downloads the whole
object just to learn its shape/transform. The fix routes fsspec
sources through ``_parse_cog_http_meta``, which only uses
``read_range``. Guard against regression by failing the test if
``read_all`` runs during ``open_geotiff(..., chunks=...)``. See
PR #1755 review.
"""
pytest.importorskip('fsspec')
import fsspec

arr = np.arange(64, dtype=np.float32).reshape(8, 8)

local_path = str(tmp_path / 'src.tif')
to_geotiff(arr, local_path, compression='none')
with open(local_path, 'rb') as f:
tiff_bytes = f.read()

fs = fsspec.filesystem('memory')
fs.pipe('/dask_1749_nofull.tif', tiff_bytes)

from xrspatial.geotiff import _reader as _reader_mod

def _no_read_all(self):
raise AssertionError(
"_CloudSource.read_all called during dask graph build")

monkeypatch.setattr(
_reader_mod._CloudSource, 'read_all', _no_read_all)

try:
lazy = open_geotiff('memory:///dask_1749_nofull.tif', chunks=4)
# Materialise to confirm the chunk tasks also avoid read_all.
np.testing.assert_array_equal(lazy.values, arr)
finally:
fs.rm('/dask_1749_nofull.tif')

def test_writer_cloud_scheme_detection(self):
"""Writer detects cloud schemes."""
from xrspatial.geotiff._writer import _is_fsspec_uri
Expand Down
Loading