From 774b9229b43770fdf057d71497b74b0419730f5d Mon Sep 17 00:00:00 2001 From: Juniper Tyree Date: Mon, 24 Feb 2025 13:20:28 +0200 Subject: [PATCH 1/8] Add ESA biomass cci dataset --- pyproject.toml | 3 +- src/climatebenchpress/data_loader/canon.py | 12 ++++- .../data_loader/datasets/all.py | 3 +- .../data_loader/datasets/esa_biomass_cci.py | 53 +++++++++++++++++++ 4 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py diff --git a/pyproject.toml b/pyproject.toml index fd4bbe8..4566ab1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ optional-dependencies.data = [ "aiohttp~=3.11.0", + "kerchunk[hdf]~=0.2.0", "pandas~=2.2.0", ] @@ -39,5 +40,5 @@ where = ["src"] addopts = ["--import-mode=importlib"] [[tool.mypy.overrides]] -module = ["fsspec.*"] +module = ["fsspec.*", "kerchunk.*"] follow_untyped_imports = true diff --git a/src/climatebenchpress/data_loader/canon.py b/src/climatebenchpress/data_loader/canon.py index 7989525..dcc4d01 100644 --- a/src/climatebenchpress/data_loader/canon.py +++ b/src/climatebenchpress/data_loader/canon.py @@ -23,13 +23,23 @@ def _ensure_axis(da: xr.DataArray, c: str) -> tuple[xr.DataArray, str]: def canonicalize_variable(da: xr.DataArray) -> xr.DataArray: + if len(da.dims) == 0: + return da + + da_old = da.copy(deep=False) + da, realization = _ensure_axis(da, "E") da, time = _ensure_axis(da, "T") da, vertical = _ensure_axis(da, "Z") da, latitude = _ensure_axis(da, "Y") da, longitude = _ensure_axis(da, "X") - return da.transpose(realization, time, vertical, latitude, longitude) + new_dims = [realization, time, vertical, latitude, longitude] + + if not all(d in new_dims for d in da.dims): + return da_old + + return da.transpose(*new_dims) def canonicalize_dataset(ds: xr.Dataset): diff --git a/src/climatebenchpress/data_loader/datasets/all.py b/src/climatebenchpress/data_loader/datasets/all.py index a08a6b6..0e3f49d 100644 --- a/src/climatebenchpress/data_loader/datasets/all.py +++ b/src/climatebenchpress/data_loader/datasets/all.py @@ -1,4 +1,5 @@ # ruff: noqa: F403 -from .era5 import * from .cmip6.all import * +from .era5 import * +from .esa_biomass_cci import * diff --git a/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py new file mode 100644 index 0000000..c7db5d2 --- /dev/null +++ b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py @@ -0,0 +1,53 @@ +__all__ = ["EsaBiomassCciDataset"] + +import kerchunk.hdf +import xarray as xr + +from .abc import Dataset +from .. import ( + open_downloaded_canonicalized_dataset, + open_downloaded_tiny_canonicalized_dataset, +) + + +class EsaBiomassCciDataset(Dataset): + name = "esa-biomass-cci" + + @staticmethod + def open() -> xr.Dataset: + urls = [ + f"https://dap.ceda.ac.uk/neodc/esacci/biomass/data/agb/maps/v5.01/netcdf/ESACCI-BIOMASS-L4-AGB-MERGED-100m-{year}-fv5.01.nc" + for year in [2010, 2015, 2016, 2017, 2018, 2019, 2020, 2021] + ] + + kcs = [ + kerchunk.hdf.SingleHdf5ToZarr( + url, + inline_threshold=0, + error="raise", + ).translate() + for url in urls + ] + + dss = [ + xr.open_dataset( + "reference://", + engine="zarr", + backend_kwargs=dict( + storage_options=dict(fo=kc), + ), + consolidated=False, + chunks=dict(), + ) + for kc in kcs + ] + + return xr.concat(dss, dim="time") + + +if __name__ == "__main__": + ds = open_downloaded_canonicalized_dataset(EsaBiomassCciDataset) + open_downloaded_tiny_canonicalized_dataset(EsaBiomassCciDataset) + + for v, da in ds.items(): + print(f"- {v}: {da.dims}") From 9cad94029cd13db05d812b7ff18491dc36f298d8 Mon Sep 17 00:00:00 2001 From: Juniper Tyree Date: Mon, 24 Feb 2025 13:24:14 +0200 Subject: [PATCH 2/8] Add some comments --- src/climatebenchpress/data_loader/canon.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/climatebenchpress/data_loader/canon.py b/src/climatebenchpress/data_loader/canon.py index dcc4d01..1712c0e 100644 --- a/src/climatebenchpress/data_loader/canon.py +++ b/src/climatebenchpress/data_loader/canon.py @@ -23,6 +23,8 @@ def _ensure_axis(da: xr.DataArray, c: str) -> tuple[xr.DataArray, str]: def canonicalize_variable(da: xr.DataArray) -> xr.DataArray: + # It makes little sense to invent every coordinate, so keep + # zero-dimensional variables as-is if len(da.dims) == 0: return da @@ -36,6 +38,8 @@ def canonicalize_variable(da: xr.DataArray) -> xr.DataArray: new_dims = [realization, time, vertical, latitude, longitude] + # Some variables contain other dimensions (e.g. DIM_bnds), + # let's not touch these' if not all(d in new_dims for d in da.dims): return da_old From b003bbbb5a57df86ba431ab35cefb64f5e92164c Mon Sep 17 00:00:00 2001 From: Tim Reichelt Date: Thu, 27 Feb 2025 15:55:45 +0000 Subject: [PATCH 3/8] Biomass Dataset with HTTP download --- pyproject.toml | 1 + src/climatebenchpress/data_loader/__init__.py | 27 ++-- .../data_loader/datasets/esa_biomass_cci.py | 117 ++++++++++++++++-- 3 files changed, 125 insertions(+), 20 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4566ab1..95e25a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "cftime~=1.6.0", "dask~=2024.12.0", "fsspec~=2024.10.0", + "requests>=2.32.3", "typed-classproperties~=1.1.0", "xarray~=2024.11.0", "zarr~=2.18.0", diff --git a/src/climatebenchpress/data_loader/__init__.py b/src/climatebenchpress/data_loader/__init__.py index 11e01b1..dcc1f90 100644 --- a/src/climatebenchpress/data_loader/__init__.py +++ b/src/climatebenchpress/data_loader/__init__.py @@ -6,6 +6,7 @@ ] from pathlib import Path +from typing import Optional import xarray as xr @@ -20,16 +21,17 @@ def open_downloaded_canonicalized_dataset( ) -> xr.Dataset: datasets = basepath / "datasets" - download = datasets / cls.name / "download.zarr" + download = datasets / cls.name / "download" if not download.exists(): - ds = cls.open() - - with monitor.progress_bar(progress): - ds.to_zarr(download, encoding=dict(), compute=False).compute() + download.mkdir(parents=True, exist_ok=True) + # The download function is responsible for checking whether the download is + # complete or not. If the previous download was interrupt it will resume the download. + # If the download is complete it will skip the download. + cls.download(download) standardized = datasets / cls.name / "standardized.zarr" if not standardized.exists(): - ds = xr.open_dataset(download, chunks=dict(), engine="zarr") + ds = cls.open(download) ds = canon.canonicalize_dataset(ds) with monitor.progress_bar(progress): @@ -42,21 +44,20 @@ def open_downloaded_tiny_canonicalized_dataset( cls: type[Dataset], basepath: Path = Path(), progress: bool = True, + slices: Optional[dict[str, slice]] = None, ) -> xr.Dataset: datasets = basepath / "datasets" - download = datasets / f"{cls.name}-tiny" / "download.zarr" + download = datasets / f"{cls.name}" / "download" if not download.exists(): - ds = cls.open() - ds = canon.canonical_tiny_dataset(ds) - - with monitor.progress_bar(progress): - ds.to_zarr(download, encoding=dict(), compute=False).compute() + download.mkdir(parents=True, exist_ok=True) + cls.download(download) standardized = datasets / f"{cls.name}-tiny" / "standardized.zarr" if not standardized.exists(): - ds = xr.open_dataset(download, chunks=dict(), engine="zarr") + ds = cls.open(download) ds = canon.canonicalize_dataset(ds) + ds = canon.canonical_tiny_dataset(ds, slices=slices) with monitor.progress_bar(progress): ds.to_zarr(standardized, encoding=dict(), compute=False).compute() diff --git a/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py index c7db5d2..c94b98b 100644 --- a/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py +++ b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py @@ -1,32 +1,135 @@ __all__ = ["EsaBiomassCciDataset"] +import time +from pathlib import Path + import kerchunk.hdf +import requests import xarray as xr -from .abc import Dataset from .. import ( open_downloaded_canonicalized_dataset, open_downloaded_tiny_canonicalized_dataset, ) +from .abc import Dataset + +NUM_RETRIES = 3 +# Approximate bounding box coordinates for mainland France +FRANCE_BBOX = { + # "latitude": slice(51.5, 41.0), # North to South + # "longitude": slice(-5.5, 10.0), # West to East + "latitude": slice(200, 300), # North to South + "longitude": slice(300, 400), # West to East +} + + +def _download_netcdf(url: str, output_path: Path, chunk_size=8192) -> bool: + """ + Download a large NetCDF file from a given URL. Ensures that download can be + resumed if it is interrupted due to network failures. + + Args: + url (str): URL of the NetCDF file + output_path (str): Local path to save the file + chunk_size (int): Size of chunks to download at a time in bytes + + Returns: + bool: True if download was successful, False otherwise + """ + session = requests.Session() + + # Check if file exists and get its size for resume capability + file_size = 0 + headers = {} + if output_path.exists(): + file_size = output_path.stat().st_size + headers["Range"] = f"bytes={file_size}-" + + try: + response = session.get(url, headers=headers, stream=True, timeout=30) + + # Handle finished, resume, or new download + if response.status_code == 416: + # HTTP response means that requested range not satisfiable + # We're assuming this to mean that the file is already downloaded. + # NOTE: Ideally we would compare the actual file size with the expected size + # or even better compare some checksums. + print(f"File already downloaded: {output_path}") + return True + elif file_size > 0 and response.status_code == 206: + mode = "ab" # Append in binary mode + else: + mode = "wb" # Write in binary mode + file_size = 0 + + total_size = int(response.headers.get("content-length", 0)) + file_size + + print(f"Downloading {url} to {output_path}") + print(f"File size: {total_size / 1e6:.2f} MB") + + with open(output_path, mode) as f: + downloaded = file_size + start_time = time.time() + + for chunk in response.iter_content(chunk_size=chunk_size): + if chunk: + f.write(chunk) + downloaded += len(chunk) + + # Calculate and display progress + elapsed_time = time.time() - start_time + if elapsed_time > 0: + speed = downloaded / (1e6 * elapsed_time) + percent = (downloaded / total_size) * 100 + print( + f"\rProgress: {percent:.2f}% - {downloaded / 1e6:.2f} MB - {speed:.2f} MB/s", + end="", + flush=True, + ) + + print("\nDownload completed!") + + except (requests.exceptions.RequestException, IOError) as e: + print(f"An error occurred: {e}") + print("You can resume the download by running the script again.") + return False + + return True class EsaBiomassCciDataset(Dataset): name = "esa-biomass-cci" @staticmethod - def open() -> xr.Dataset: + def download(download_path: Path): + # urls = [ + # f"https://dap.ceda.ac.uk/neodc/esacci/biomass/data/agb/maps/v5.01/netcdf/ESACCI-BIOMASS-L4-AGB-MERGED-100m-{year}-fv5.01.nc" + # for year in [2010, 2015, 2016, 2017, 2018, 2019, 2020, 2021] + # ] urls = [ - f"https://dap.ceda.ac.uk/neodc/esacci/biomass/data/agb/maps/v5.01/netcdf/ESACCI-BIOMASS-L4-AGB-MERGED-100m-{year}-fv5.01.nc" - for year in [2010, 2015, 2016, 2017, 2018, 2019, 2020, 2021] + "https://dap.ceda.ac.uk/neodc/esacci/biomass/data/agb/maps/v5.01/netcdf/ESACCI-BIOMASS-L4-AGB-MERGED-10000m-fv5.01.nc" ] + for url in urls: + output_path = download_path / Path(url).name + for _ in range(NUM_RETRIES): + success = _download_netcdf(url, output_path) + if success: + break + if not success: + print(f"Failed to download {url}") + return + @staticmethod + def open(download_path: Path) -> xr.Dataset: + files = list(download_path.glob("*.nc")) kcs = [ kerchunk.hdf.SingleHdf5ToZarr( - url, + str(file), inline_threshold=0, error="raise", + storage_options={"timeout": 30}, ).translate() - for url in urls + for file in files ] dss = [ @@ -47,7 +150,7 @@ def open() -> xr.Dataset: if __name__ == "__main__": ds = open_downloaded_canonicalized_dataset(EsaBiomassCciDataset) - open_downloaded_tiny_canonicalized_dataset(EsaBiomassCciDataset) + open_downloaded_tiny_canonicalized_dataset(EsaBiomassCciDataset, slices=FRANCE_BBOX) for v, da in ds.items(): print(f"- {v}: {da.dims}") From 63473438ac9e32e9e1c9a42ec69c722a3d89a6e4 Mon Sep 17 00:00:00 2001 From: Tim Reichelt Date: Thu, 27 Feb 2025 16:38:35 +0000 Subject: [PATCH 4/8] France bounding box --- .../data_loader/datasets/esa_biomass_cci.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py index c94b98b..b7b3693 100644 --- a/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py +++ b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py @@ -14,13 +14,8 @@ from .abc import Dataset NUM_RETRIES = 3 -# Approximate bounding box coordinates for mainland France -FRANCE_BBOX = { - # "latitude": slice(51.5, 41.0), # North to South - # "longitude": slice(-5.5, 10.0), # West to East - "latitude": slice(200, 300), # North to South - "longitude": slice(300, 400), # West to East -} +# Approximate bounding box for mainland France +FRANCE_BBOX = {"X": slice(157500, 157500), "Y": slice(196313, 213750)} def _download_netcdf(url: str, output_path: Path, chunk_size=8192) -> bool: @@ -102,12 +97,10 @@ class EsaBiomassCciDataset(Dataset): @staticmethod def download(download_path: Path): - # urls = [ - # f"https://dap.ceda.ac.uk/neodc/esacci/biomass/data/agb/maps/v5.01/netcdf/ESACCI-BIOMASS-L4-AGB-MERGED-100m-{year}-fv5.01.nc" - # for year in [2010, 2015, 2016, 2017, 2018, 2019, 2020, 2021] - # ] urls = [ - "https://dap.ceda.ac.uk/neodc/esacci/biomass/data/agb/maps/v5.01/netcdf/ESACCI-BIOMASS-L4-AGB-MERGED-10000m-fv5.01.nc" + f"https://dap.ceda.ac.uk/neodc/esacci/biomass/data/agb/maps/v5.01/netcdf/ESACCI-BIOMASS-L4-AGB-MERGED-100m-{year}-fv5.01.nc" + # Restrict to 2 years for now for smaller download. + for year in [2010, 2015] ] for url in urls: output_path = download_path / Path(url).name @@ -127,7 +120,6 @@ def open(download_path: Path) -> xr.Dataset: str(file), inline_threshold=0, error="raise", - storage_options={"timeout": 30}, ).translate() for file in files ] From ae4a8c1acf1e8bd7f6b3f7599389c5e598fed7a1 Mon Sep 17 00:00:00 2001 From: Tim Reichelt Date: Fri, 28 Feb 2025 11:42:39 +0000 Subject: [PATCH 5/8] Remove bug in slices for tiny dataset --- src/climatebenchpress/data_loader/__init__.py | 16 +++++++++++++++- .../data_loader/datasets/esa_biomass_cci.py | 8 ++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/climatebenchpress/data_loader/__init__.py b/src/climatebenchpress/data_loader/__init__.py index dcc1f90..1914c76 100644 --- a/src/climatebenchpress/data_loader/__init__.py +++ b/src/climatebenchpress/data_loader/__init__.py @@ -14,6 +14,15 @@ from .datasets.abc import Dataset +def _rechunk_dataset(ds: xr.Dataset) -> xr.Dataset: + rechunked = ds.copy() + for var_name in ds.data_vars: + if hasattr(ds[var_name].data, "chunks"): + rechunked[var_name] = ds[var_name].chunk("auto") + + return rechunked + + def open_downloaded_canonicalized_dataset( cls: type[Dataset], basepath: Path = Path(), @@ -58,8 +67,13 @@ def open_downloaded_tiny_canonicalized_dataset( ds = cls.open(download) ds = canon.canonicalize_dataset(ds) ds = canon.canonical_tiny_dataset(ds, slices=slices) + # Rechunk the data because "tiny-fication" can lead to inconsistent or + # suboptimal chunking. + ds = _rechunk_dataset(ds) with monitor.progress_bar(progress): - ds.to_zarr(standardized, encoding=dict(), compute=False).compute() + ds.to_zarr( + standardized, encoding=dict(), compute=False, consolidated=True + ).compute() return xr.open_dataset(standardized, chunks=dict(), engine="zarr") diff --git a/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py index b7b3693..37c7689 100644 --- a/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py +++ b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py @@ -15,7 +15,7 @@ NUM_RETRIES = 3 # Approximate bounding box for mainland France -FRANCE_BBOX = {"X": slice(157500, 157500), "Y": slice(196313, 213750)} +FRANCE_BBOX = {"X": slice(196313, 213750), "Y": slice(32063, 43875)} def _download_netcdf(url: str, output_path: Path, chunk_size=8192) -> bool: @@ -137,7 +137,11 @@ def open(download_path: Path) -> xr.Dataset: for kc in kcs ] - return xr.concat(dss, dim="time") + ds = xr.concat(dss, dim="time") + # Needed to make the dataset CF-compliant. + ds.lon.attrs["axis"] = "X" + ds.lat.attrs["axis"] = "Y" + return ds if __name__ == "__main__": From 5c54aa7ab2440d3e3cb19ff3ecc0303f83187fcd Mon Sep 17 00:00:00 2001 From: Tim Reichelt Date: Mon, 3 Mar 2025 10:24:49 +0000 Subject: [PATCH 6/8] Remove kerchunk, move to logging and tqdm --- pyproject.toml | 7 +- .../data_loader/datasets/esa_biomass_cci.py | 150 ++++++++---------- 2 files changed, 67 insertions(+), 90 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 95e25a7..415281f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,15 +9,18 @@ dependencies = [ "cftime~=1.6.0", "dask~=2024.12.0", "fsspec~=2024.10.0", - "requests>=2.32.3", + "requests~=2.32.3", + "tqdm~=4.67.1", "typed-classproperties~=1.1.0", + "types-requests~=2.32.0.20241016", + "types-tqdm~=4.67.0.20250301", "xarray~=2024.11.0", "zarr~=2.18.0", ] optional-dependencies.data = [ "aiohttp~=3.11.0", - "kerchunk[hdf]~=0.2.0", + "netcdf4~=1.7.2", "pandas~=2.2.0", ] diff --git a/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py index 37c7689..313f8a3 100644 --- a/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py +++ b/src/climatebenchpress/data_loader/datasets/esa_biomass_cci.py @@ -1,11 +1,11 @@ __all__ = ["EsaBiomassCciDataset"] -import time +import logging from pathlib import Path -import kerchunk.hdf import requests import xarray as xr +from tqdm import tqdm from .. import ( open_downloaded_canonicalized_dataset, @@ -18,7 +18,39 @@ FRANCE_BBOX = {"X": slice(196313, 213750), "Y": slice(32063, 43875)} -def _download_netcdf(url: str, output_path: Path, chunk_size=8192) -> bool: +class EsaBiomassCciDataset(Dataset): + name = "esa-biomass-cci-test" + + @staticmethod + def download(download_path: Path, progress: bool = True): + urls = [ + f"https://dap.ceda.ac.uk/neodc/esacci/biomass/data/agb/maps/v5.01/netcdf/ESACCI-BIOMASS-L4-AGB-MERGED-100m-{year}-fv5.01.nc" + # Restrict to 2 years for now for smaller download. + for year in [2010, 2015] + ] + for url in urls: + output_path = download_path / Path(url).name + for _ in range(NUM_RETRIES): + success = _download_netcdf(url, output_path, progress) + if success: + break + if not success: + logging.info(f"Failed to download {url}") + return + + @staticmethod + def open(download_path: Path) -> xr.Dataset: + # Need string conversion for argument to be interpreted as a glob pattern. + ds = xr.open_mfdataset(str(download_path / "*.nc")) + # Needed to make the dataset CF-compliant. + ds.lon.attrs["axis"] = "X" + ds.lat.attrs["axis"] = "Y" + return ds + + +def _download_netcdf( + url: str, output_path: Path, progress: bool, chunk_size=8192 +) -> bool: """ Download a large NetCDF file from a given URL. Ensures that download can be resumed if it is interrupted due to network failures. @@ -31,6 +63,11 @@ def _download_netcdf(url: str, output_path: Path, chunk_size=8192) -> bool: Returns: bool: True if download was successful, False otherwise """ + donefile = output_path.with_name(output_path.name + ".done") + if donefile.exists(): + logging.debug(f"File already downloaded: {output_path}") + return True + session = requests.Session() # Check if file exists and get its size for resume capability @@ -43,15 +80,8 @@ def _download_netcdf(url: str, output_path: Path, chunk_size=8192) -> bool: try: response = session.get(url, headers=headers, stream=True, timeout=30) - # Handle finished, resume, or new download - if response.status_code == 416: - # HTTP response means that requested range not satisfiable - # We're assuming this to mean that the file is already downloaded. - # NOTE: Ideally we would compare the actual file size with the expected size - # or even better compare some checksums. - print(f"File already downloaded: {output_path}") - return True - elif file_size > 0 and response.status_code == 206: + # Handle resume or new download + if file_size > 0 and response.status_code == 206: mode = "ab" # Append in binary mode else: mode = "wb" # Write in binary mode @@ -59,91 +89,35 @@ def _download_netcdf(url: str, output_path: Path, chunk_size=8192) -> bool: total_size = int(response.headers.get("content-length", 0)) + file_size - print(f"Downloading {url} to {output_path}") - print(f"File size: {total_size / 1e6:.2f} MB") + logging.debug(f"Downloading {url} to {output_path} in mode '{mode}'") + logging.debug(f"File size: {total_size / 1e6:.2f} MB") with open(output_path, mode) as f: - downloaded = file_size - start_time = time.time() - - for chunk in response.iter_content(chunk_size=chunk_size): - if chunk: - f.write(chunk) - downloaded += len(chunk) - - # Calculate and display progress - elapsed_time = time.time() - start_time - if elapsed_time > 0: - speed = downloaded / (1e6 * elapsed_time) - percent = (downloaded / total_size) * 100 - print( - f"\rProgress: {percent:.2f}% - {downloaded / 1e6:.2f} MB - {speed:.2f} MB/s", - end="", - flush=True, - ) - - print("\nDownload completed!") + with tqdm( + total=total_size, + unit="B", + unit_scale=True, + desc=output_path.name, + initial=file_size, + ascii=True, + ) as pbar: + for chunk in response.iter_content(chunk_size=chunk_size): + if chunk: + f.write(chunk) + pbar.update(len(chunk)) + + logging.debug("\nDownload completed!") + + donefile.touch() except (requests.exceptions.RequestException, IOError) as e: - print(f"An error occurred: {e}") - print("You can resume the download by running the script again.") + logging.error(f"An error occurred: {e}") + logging.error("You can resume the download by running the script again.") return False return True -class EsaBiomassCciDataset(Dataset): - name = "esa-biomass-cci" - - @staticmethod - def download(download_path: Path): - urls = [ - f"https://dap.ceda.ac.uk/neodc/esacci/biomass/data/agb/maps/v5.01/netcdf/ESACCI-BIOMASS-L4-AGB-MERGED-100m-{year}-fv5.01.nc" - # Restrict to 2 years for now for smaller download. - for year in [2010, 2015] - ] - for url in urls: - output_path = download_path / Path(url).name - for _ in range(NUM_RETRIES): - success = _download_netcdf(url, output_path) - if success: - break - if not success: - print(f"Failed to download {url}") - return - - @staticmethod - def open(download_path: Path) -> xr.Dataset: - files = list(download_path.glob("*.nc")) - kcs = [ - kerchunk.hdf.SingleHdf5ToZarr( - str(file), - inline_threshold=0, - error="raise", - ).translate() - for file in files - ] - - dss = [ - xr.open_dataset( - "reference://", - engine="zarr", - backend_kwargs=dict( - storage_options=dict(fo=kc), - ), - consolidated=False, - chunks=dict(), - ) - for kc in kcs - ] - - ds = xr.concat(dss, dim="time") - # Needed to make the dataset CF-compliant. - ds.lon.attrs["axis"] = "X" - ds.lat.attrs["axis"] = "Y" - return ds - - if __name__ == "__main__": ds = open_downloaded_canonicalized_dataset(EsaBiomassCciDataset) open_downloaded_tiny_canonicalized_dataset(EsaBiomassCciDataset, slices=FRANCE_BBOX) From eda066b271e7266c96a6dc1a1069d7aef3fab656 Mon Sep 17 00:00:00 2001 From: Tim Reichelt Date: Mon, 3 Mar 2025 10:25:40 +0000 Subject: [PATCH 7/8] Make full repository compatible with new Dataset --- datasets/.gitignore | 2 +- src/climatebenchpress/data_loader/__init__.py | 22 ++++++------- .../data_loader/datasets/abc.py | 8 ++++- .../data_loader/datasets/cmip6/abc.py | 32 ++++++++++++++++--- .../datasets/cmip6/access_atmos.py | 8 +++-- .../datasets/cmip6/access_ocean.py | 12 ++++--- .../datasets/cmip6/canesm5_atmos.py | 8 +++-- .../datasets/cmip6/canesm5_ocean.py | 11 ++++--- .../data_loader/datasets/cmip6/ukesm_atmos.py | 8 +++-- .../data_loader/datasets/cmip6/ukesm_ocean.py | 11 ++++--- .../data_loader/datasets/era5.py | 18 +++++++++-- tests/test_virtual.py | 20 ++++++++++-- 12 files changed, 116 insertions(+), 44 deletions(-) diff --git a/datasets/.gitignore b/datasets/.gitignore index f4b6608..c99347c 100644 --- a/datasets/.gitignore +++ b/datasets/.gitignore @@ -1,2 +1,2 @@ -/*/download.zarr +/*/download /*/standardized.zarr diff --git a/src/climatebenchpress/data_loader/__init__.py b/src/climatebenchpress/data_loader/__init__.py index 1914c76..7a98367 100644 --- a/src/climatebenchpress/data_loader/__init__.py +++ b/src/climatebenchpress/data_loader/__init__.py @@ -14,15 +14,6 @@ from .datasets.abc import Dataset -def _rechunk_dataset(ds: xr.Dataset) -> xr.Dataset: - rechunked = ds.copy() - for var_name in ds.data_vars: - if hasattr(ds[var_name].data, "chunks"): - rechunked[var_name] = ds[var_name].chunk("auto") - - return rechunked - - def open_downloaded_canonicalized_dataset( cls: type[Dataset], basepath: Path = Path(), @@ -36,7 +27,7 @@ def open_downloaded_canonicalized_dataset( # The download function is responsible for checking whether the download is # complete or not. If the previous download was interrupt it will resume the download. # If the download is complete it will skip the download. - cls.download(download) + cls.download(download, progress) standardized = datasets / cls.name / "standardized.zarr" if not standardized.exists(): @@ -60,7 +51,7 @@ def open_downloaded_tiny_canonicalized_dataset( download = datasets / f"{cls.name}" / "download" if not download.exists(): download.mkdir(parents=True, exist_ok=True) - cls.download(download) + cls.download(download, progress) standardized = datasets / f"{cls.name}-tiny" / "standardized.zarr" if not standardized.exists(): @@ -77,3 +68,12 @@ def open_downloaded_tiny_canonicalized_dataset( ).compute() return xr.open_dataset(standardized, chunks=dict(), engine="zarr") + + +def _rechunk_dataset(ds: xr.Dataset) -> xr.Dataset: + rechunked = ds.copy() + for var_name in ds.data_vars: + if hasattr(ds[var_name].data, "chunks"): + rechunked[var_name] = ds[var_name].chunk("auto") + + return rechunked diff --git a/src/climatebenchpress/data_loader/datasets/abc.py b/src/climatebenchpress/data_loader/datasets/abc.py index f1d11d4..0075a25 100644 --- a/src/climatebenchpress/data_loader/datasets/abc.py +++ b/src/climatebenchpress/data_loader/datasets/abc.py @@ -3,6 +3,7 @@ from abc import ABC, abstractmethod from collections.abc import Mapping from inspect import isabstract +from pathlib import Path from types import MappingProxyType import xarray as xr @@ -15,7 +16,12 @@ class Dataset(ABC): @staticmethod @abstractmethod - def open() -> xr.Dataset: + def download(download_path: Path, progress: bool = True): + pass + + @staticmethod + @abstractmethod + def open(download_path: Path) -> xr.Dataset: pass # Class interface diff --git a/src/climatebenchpress/data_loader/datasets/cmip6/abc.py b/src/climatebenchpress/data_loader/datasets/cmip6/abc.py index 12b027c..b42855d 100644 --- a/src/climatebenchpress/data_loader/datasets/cmip6/abc.py +++ b/src/climatebenchpress/data_loader/datasets/cmip6/abc.py @@ -5,11 +5,14 @@ ] from functools import lru_cache +from pathlib import Path +from typing import Optional import fsspec import pandas as pd import xarray as xr +from ... import monitor from ..abc import Dataset @@ -20,9 +23,20 @@ class Cmip6Dataset(Dataset): table_id: str @staticmethod - def open_with( - model_id: str, ssp_id: str, variable_id: str, table_id: str - ) -> xr.Dataset: + def download_with( + download_path: Path, + model_id: str, + ssp_id: str, + variable_id: str, + table_id: str, + variable_selector: Optional[list[str]] = None, + progress: bool = True, + ): + downloadfile = download_path / "download.zarr" + donefile = downloadfile.parent / (downloadfile.name + ".done") + if donefile.exists(): + return + df = Cmip6Dataset.get_stores() df_ta = df.query( @@ -33,7 +47,17 @@ def open_with( zstore = df_ta.zstore.values[-1] zstore = zstore.replace("gs://", "https://storage.googleapis.com/") - return xr.open_zarr(fsspec.get_mapper(zstore), consolidated=True) + ds = xr.open_zarr(fsspec.get_mapper(zstore), consolidated=True) + if variable_selector is not None: + ds = ds[variable_selector] + with monitor.progress_bar(progress): + ds.to_zarr(downloadfile, mode="w", encoding=dict(), compute=False).compute() + + donefile.touch() + + @staticmethod + def open(download_path: Path) -> xr.Dataset: + return xr.open_zarr(download_path / "download.zarr") @lru_cache @staticmethod diff --git a/src/climatebenchpress/data_loader/datasets/cmip6/access_atmos.py b/src/climatebenchpress/data_loader/datasets/cmip6/access_atmos.py index 1db95c7..3c78723 100644 --- a/src/climatebenchpress/data_loader/datasets/cmip6/access_atmos.py +++ b/src/climatebenchpress/data_loader/datasets/cmip6/access_atmos.py @@ -1,6 +1,6 @@ __all__ = ["Cmip6AtmosphereAccessDataset"] -import xarray as xr +from pathlib import Path from ... import ( open_downloaded_canonicalized_dataset, @@ -16,12 +16,14 @@ class Cmip6AtmosphereAccessDataset(Cmip6AtmosphereDataset): ssp_id = "ssp585" @staticmethod - def open() -> xr.Dataset: - return Cmip6Dataset.open_with( + def download(download_path: Path, progress: bool = True): + Cmip6Dataset.download_with( + download_path, Cmip6AtmosphereAccessDataset.model_id, Cmip6AtmosphereAccessDataset.ssp_id, Cmip6AtmosphereAccessDataset.variable_id, Cmip6AtmosphereAccessDataset.table_id, + progress=progress, ) diff --git a/src/climatebenchpress/data_loader/datasets/cmip6/access_ocean.py b/src/climatebenchpress/data_loader/datasets/cmip6/access_ocean.py index 3034e90..4198347 100644 --- a/src/climatebenchpress/data_loader/datasets/cmip6/access_ocean.py +++ b/src/climatebenchpress/data_loader/datasets/cmip6/access_ocean.py @@ -1,6 +1,6 @@ __all__ = ["Cmip6OceanAccessDataset"] -import xarray as xr +from pathlib import Path from ... import ( open_downloaded_canonicalized_dataset, @@ -16,15 +16,17 @@ class Cmip6OceanAccessDataset(Cmip6OceanDataset): ssp_id = "ssp585" @staticmethod - def open() -> xr.Dataset: - ds = Cmip6Dataset.open_with( + def download(download_path: Path, progress: bool = True): + Cmip6Dataset.download_with( + download_path, Cmip6OceanAccessDataset.model_id, Cmip6OceanAccessDataset.ssp_id, Cmip6OceanAccessDataset.variable_id, Cmip6OceanAccessDataset.table_id, + # Only download the actual sea surface temperature. + variable_selector=["tos"], + progress=progress, ) - # Only keep the actual sea surface temperature. - return ds[["tos"]] if __name__ == "__main__": diff --git a/src/climatebenchpress/data_loader/datasets/cmip6/canesm5_atmos.py b/src/climatebenchpress/data_loader/datasets/cmip6/canesm5_atmos.py index 50f72b9..600ef42 100644 --- a/src/climatebenchpress/data_loader/datasets/cmip6/canesm5_atmos.py +++ b/src/climatebenchpress/data_loader/datasets/cmip6/canesm5_atmos.py @@ -1,6 +1,6 @@ __all__ = ["Cmip6AtmosphereCanEsm5Dataset"] -import xarray as xr +from pathlib import Path from ... import ( open_downloaded_canonicalized_dataset, @@ -16,12 +16,14 @@ class Cmip6AtmosphereCanEsm5Dataset(Cmip6AtmosphereDataset): ssp_id = "ssp585" @staticmethod - def open() -> xr.Dataset: - return Cmip6Dataset.open_with( + def download(download_path: Path, progress: bool = True): + Cmip6Dataset.download_with( + download_path, Cmip6AtmosphereCanEsm5Dataset.model_id, Cmip6AtmosphereCanEsm5Dataset.ssp_id, Cmip6AtmosphereCanEsm5Dataset.variable_id, Cmip6AtmosphereCanEsm5Dataset.table_id, + progress=progress, ) diff --git a/src/climatebenchpress/data_loader/datasets/cmip6/canesm5_ocean.py b/src/climatebenchpress/data_loader/datasets/cmip6/canesm5_ocean.py index 4a3690b..87ddcb0 100644 --- a/src/climatebenchpress/data_loader/datasets/cmip6/canesm5_ocean.py +++ b/src/climatebenchpress/data_loader/datasets/cmip6/canesm5_ocean.py @@ -1,6 +1,6 @@ __all__ = ["Cmip6OceanCanEsm5Dataset"] -import xarray as xr +from pathlib import Path from ... import ( open_downloaded_canonicalized_dataset, @@ -16,14 +16,17 @@ class Cmip6OceanCanEsm5Dataset(Cmip6OceanDataset): ssp_id = "ssp585" @staticmethod - def open() -> xr.Dataset: - ds = Cmip6Dataset.open_with( + def download(download_path: Path, progress: bool = True): + Cmip6Dataset.download_with( + download_path, Cmip6OceanCanEsm5Dataset.model_id, Cmip6OceanCanEsm5Dataset.ssp_id, Cmip6OceanCanEsm5Dataset.variable_id, Cmip6OceanCanEsm5Dataset.table_id, + # Only download the actual sea surface temperature. + variable_selector=["tos"], + progress=progress, ) - return ds[["tos"]] if __name__ == "__main__": diff --git a/src/climatebenchpress/data_loader/datasets/cmip6/ukesm_atmos.py b/src/climatebenchpress/data_loader/datasets/cmip6/ukesm_atmos.py index 5d601ec..5b3ed2f 100644 --- a/src/climatebenchpress/data_loader/datasets/cmip6/ukesm_atmos.py +++ b/src/climatebenchpress/data_loader/datasets/cmip6/ukesm_atmos.py @@ -1,6 +1,6 @@ __all__ = ["Cmip6AtmosphereUkEsmDataset"] -import xarray as xr +from pathlib import Path from ... import ( open_downloaded_canonicalized_dataset, @@ -16,12 +16,14 @@ class Cmip6AtmosphereUkEsmDataset(Cmip6AtmosphereDataset): ssp_id = "ssp585" @staticmethod - def open() -> xr.Dataset: - return Cmip6Dataset.open_with( + def download(download_path: Path, progress: bool = True): + Cmip6Dataset.download_with( + download_path, Cmip6AtmosphereUkEsmDataset.model_id, Cmip6AtmosphereUkEsmDataset.ssp_id, Cmip6AtmosphereUkEsmDataset.variable_id, Cmip6AtmosphereUkEsmDataset.table_id, + progress=progress, ) diff --git a/src/climatebenchpress/data_loader/datasets/cmip6/ukesm_ocean.py b/src/climatebenchpress/data_loader/datasets/cmip6/ukesm_ocean.py index 6f3fcf5..b306388 100644 --- a/src/climatebenchpress/data_loader/datasets/cmip6/ukesm_ocean.py +++ b/src/climatebenchpress/data_loader/datasets/cmip6/ukesm_ocean.py @@ -1,6 +1,6 @@ __all__ = ["Cmip6OceanUkEsmDataset"] -import xarray as xr +from pathlib import Path from ... import ( open_downloaded_canonicalized_dataset, @@ -16,14 +16,17 @@ class Cmip6OceanUkEsmDataset(Cmip6OceanDataset): ssp_id = "ssp585" @staticmethod - def open() -> xr.Dataset: - ds = Cmip6Dataset.open_with( + def download(download_path: Path, progress: bool = True): + Cmip6Dataset.download_with( + download_path, Cmip6OceanUkEsmDataset.model_id, Cmip6OceanUkEsmDataset.ssp_id, Cmip6OceanUkEsmDataset.variable_id, Cmip6OceanUkEsmDataset.table_id, + # Only download the actual sea surface temperature. + variable_selector=["tos"], + progress=progress, ) - return ds[["tos"]] if __name__ == "__main__": diff --git a/src/climatebenchpress/data_loader/datasets/era5.py b/src/climatebenchpress/data_loader/datasets/era5.py index 5bf1427..2fbdc93 100644 --- a/src/climatebenchpress/data_loader/datasets/era5.py +++ b/src/climatebenchpress/data_loader/datasets/era5.py @@ -1,8 +1,11 @@ __all__ = ["Era5Dataset"] +from pathlib import Path + import xarray as xr from .. import ( + monitor, open_downloaded_canonicalized_dataset, open_downloaded_tiny_canonicalized_dataset, ) @@ -15,7 +18,12 @@ class Era5Dataset(Dataset): name = "era5" @staticmethod - def open() -> xr.Dataset: + def download(download_path: Path, progress: bool = True): + downloadfile = download_path / "download.zarr" + donefile = downloadfile.parent / (downloadfile.name + ".done") + if donefile.exists(): + return + era5 = xr.open_zarr(ERA5_GCP_PATH, chunks={"time": 48}, consolidated=True) ds = era5.sel(time=slice("2020-03-01", "2020-03-07"))[ @@ -29,7 +37,13 @@ def open() -> xr.Dataset: ds.time.attrs["standard_name"] = "time" ds.longitude.attrs["axis"] = "X" ds.latitude.attrs["axis"] = "Y" - return ds + with monitor.progress_bar(progress): + ds.to_zarr(downloadfile, mode="w", encoding=dict(), compute=False).compute() + donefile.touch() + + @staticmethod + def open(download_path: Path) -> xr.Dataset: + return xr.open_zarr(download_path / "download.zarr") if __name__ == "__main__": diff --git a/tests/test_virtual.py b/tests/test_virtual.py index 9e33fde..3820b94 100644 --- a/tests/test_virtual.py +++ b/tests/test_virtual.py @@ -1,5 +1,8 @@ +from pathlib import Path + import climatebenchpress.data_loader import climatebenchpress.data_loader.datasets.abc +import climatebenchpress.data_loader.monitor import fsspec import xarray as xr from upath import UPath @@ -15,7 +18,7 @@ def test_virtual_download(): ) assert ds.t.shape == (1, 1, 1, 2, 2) - assert (basepath / "datasets" / "test" / "download.zarr").exists() + assert (basepath / "datasets" / "test" / "download" / "download.zarr").exists() assert (basepath / "datasets" / "test" / "standardized.zarr").exists() @@ -23,8 +26,8 @@ class TestDataset(climatebenchpress.data_loader.datasets.abc.Dataset): name = "test" @staticmethod - def open() -> xr.Dataset: - return xr.Dataset( + def download(download_path: Path, progress: bool = True): + ds = xr.Dataset( { "t": (("lat", "lon"), [[1, 2], [3, 4]]), }, @@ -33,3 +36,14 @@ def open() -> xr.Dataset: "lon": ("lon", [0, 180], {"standard_name": "longitude", "axis": "X"}), }, ) + with climatebenchpress.data_loader.monitor.progress_bar(progress): + ds.to_zarr( + download_path / "download.zarr", + mode="w", + encoding=dict(), + compute=False, + ).compute() + + @staticmethod + def open(download_path: Path) -> xr.Dataset: + return xr.open_zarr(download_path / "download.zarr") From 7a8f5b7938e211e0de382ceb26d0ce60394e0c9a Mon Sep 17 00:00:00 2001 From: Tim Reichelt Date: Mon, 3 Mar 2025 15:15:35 +0000 Subject: [PATCH 8/8] Move type stubs to dev dependencies --- pyproject.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 415281f..c75468b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,8 +12,6 @@ dependencies = [ "requests~=2.32.3", "tqdm~=4.67.1", "typed-classproperties~=1.1.0", - "types-requests~=2.32.0.20241016", - "types-tqdm~=4.67.0.20250301", "xarray~=2024.11.0", "zarr~=2.18.0", ] @@ -31,6 +29,8 @@ dev = [ "pandas-stubs~=2.2.0", "pytest~=8.3", "ruff~=0.8", + "types-requests~=2.32.0.20241016", + "types-tqdm~=4.67.0.20250301", "universal-pathlib~=0.2.0", ] @@ -44,5 +44,5 @@ where = ["src"] addopts = ["--import-mode=importlib"] [[tool.mypy.overrides]] -module = ["fsspec.*", "kerchunk.*"] +module = ["fsspec.*"] follow_untyped_imports = true