diff --git a/esmvalcore/preprocessor/_io.py b/esmvalcore/preprocessor/_io.py index 564ec89fe3..efdc27d2eb 100644 --- a/esmvalcore/preprocessor/_io.py +++ b/esmvalcore/preprocessor/_io.py @@ -304,7 +304,9 @@ def save(cubes, "The cube is probably unchanged.", cubes, filename) return filename - logger.debug("Saving cubes %s to %s", cubes, filename) + for cube in cubes: + logger.debug("Saving cube:\n%s\nwith %s data to %s", cube, + "lazy" if cube.has_lazy_data() else "realized", filename) if optimize_access: cube = cubes[0] if optimize_access == 'map': diff --git a/esmvalcore/preprocessor/_multimodel.py b/esmvalcore/preprocessor/_multimodel.py index 9bb6d6e2af..0f862b0e90 100644 --- a/esmvalcore/preprocessor/_multimodel.py +++ b/esmvalcore/preprocessor/_multimodel.py @@ -177,7 +177,18 @@ def _map_to_new_time(cube, time_points): Missing data inside original bounds is filled with nearest neighbour Missing data outside original bounds is masked. """ - time_points = cube.coord('time').units.num2date(time_points) + time_coord = cube.coord('time') + + # Try if the required time points can be obtained by slicing the cube. + time_slice = np.isin(time_coord.points, time_points) + if np.any(time_slice) and np.array_equal(time_coord.points[time_slice], + time_points): + time_idx, = cube.coord_dims('time') + indices = tuple(time_slice if i == time_idx else slice(None) + for i in range(cube.ndim)) + return cube[indices] + + time_points = time_coord.units.num2date(time_points) sample_points = [('time', time_points)] scheme = iris.analysis.Nearest(extrapolation_mode='mask') @@ -505,43 +516,17 @@ def _compute_eager( """Compute statistics one slice at a time.""" _ = [cube.data for cube in cubes] # make sure the cubes' data are realized - result_slices = [] + result_slices = iris.cube.CubeList() for chunk in _compute_slices(cubes): if chunk is None: - single_model_slices = cubes # scalar cubes + input_slices = cubes # scalar cubes else: - single_model_slices = [cube[chunk] for cube in cubes] - combined_slice = _combine(single_model_slices) - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message=( - "Collapsing a non-contiguous coordinate. " - f"Metadata may not be fully descriptive for '{CONCAT_DIM}." - ), - category=UserWarning, - module='iris', - ) - warnings.filterwarnings( - 'ignore', - message=( - f"Cannot check if coordinate is contiguous: Invalid " - f"operation for '{CONCAT_DIM}'" - ), - category=UserWarning, - module='iris', - ) - collapsed_slice = combined_slice.collapsed(CONCAT_DIM, operator, - **kwargs) - - # some iris aggregators modify dtype, see e.g. - # https://numpy.org/doc/stable/reference/generated/numpy.ma.average.html - collapsed_slice.data = collapsed_slice.data.astype(np.float32) - - result_slices.append(collapsed_slice) + input_slices = [cube[chunk] for cube in cubes] + result_slice = _compute(input_slices, operator=operator, **kwargs) + result_slices.append(result_slice) try: - result_cube = CubeList(result_slices).concatenate_cube() + result_cube = result_slices.concatenate_cube() except Exception as excinfo: raise ValueError( f"Multi-model statistics failed to concatenate results into a " @@ -551,7 +536,45 @@ def _compute_eager( f"dtypes") from excinfo result_cube.data = np.ma.array(result_cube.data) + + return result_cube + + +def _compute(cubes: list, *, operator: iris.analysis.Aggregator, **kwargs): + """Compute statistic.""" + cube = _combine(cubes) + + with warnings.catch_warnings(): + warnings.filterwarnings( + 'ignore', + message=( + "Collapsing a non-contiguous coordinate. " + f"Metadata may not be fully descriptive for '{CONCAT_DIM}." + ), + category=UserWarning, + module='iris', + ) + warnings.filterwarnings( + 'ignore', + message=( + f"Cannot check if coordinate is contiguous: Invalid " + f"operation for '{CONCAT_DIM}'" + ), + category=UserWarning, + module='iris', + ) + # This will always return a masked array + result_cube = cube.collapsed(CONCAT_DIM, operator, **kwargs) + + # Remove concatenation dimension added by _combine result_cube.remove_coord(CONCAT_DIM) + for cube in cubes: + cube.remove_coord(CONCAT_DIM) + + # some iris aggregators modify dtype, see e.g. + # https://numpy.org/doc/stable/reference/generated/numpy.ma.average.html + result_cube.data = result_cube.core_data().astype(np.float32) + if result_cube.cell_methods: cell_method = result_cube.cell_methods[0] result_cube.cell_methods = None @@ -578,12 +601,12 @@ def _multicube_statistics(cubes, statistics, span, ignore_scalar_coords=False): ) # Avoid modifying inputs - copied_cubes = [cube.copy() for cube in cubes] + cubes = [cube.copy() for cube in cubes] # Remove scalar coordinates in input cubes if desired to ignore them when # merging if ignore_scalar_coords: - for cube in copied_cubes: + for cube in cubes: for scalar_coord in cube.coords(dimensions=()): cube.remove_coord(scalar_coord) logger.debug( @@ -595,11 +618,11 @@ def _multicube_statistics(cubes, statistics, span, ignore_scalar_coords=False): # If all cubes contain a time coordinate, align them. If no cube contains a # time coordinate, do nothing. Else, raise an exception. - time_coords = [cube.coords('time') for cube in copied_cubes] + time_coords = [cube.coords('time') for cube in cubes] if all(time_coords): - aligned_cubes = _align_time_coord(copied_cubes, span=span) + cubes = _align_time_coord(cubes, span=span) elif not any(time_coords): - aligned_cubes = copied_cubes + pass else: raise ValueError( "Multi-model statistics failed to merge input cubes into a single " @@ -609,13 +632,14 @@ def _multicube_statistics(cubes, statistics, span, ignore_scalar_coords=False): # Calculate statistics statistics_cubes = {} + lazy_input = any(cube.has_lazy_data() for cube in cubes) for statistic in statistics: logger.debug('Multicube statistics: computing: %s', statistic) operator, kwargs = _resolve_operator(statistic) - - result_cube = _compute_eager(aligned_cubes, - operator=operator, - **kwargs) + if lazy_input and operator.lazy_func is not None: + result_cube = _compute(cubes, operator=operator, **kwargs) + else: + result_cube = _compute_eager(cubes, operator=operator, **kwargs) statistics_cubes[statistic] = result_cube return statistics_cubes @@ -720,6 +744,8 @@ def multi_model_statistics(products, arguments. Except for percentiles, these operators are currently not supported. + Lazy operation is supported for all statistics, except ``median``. + Parameters ---------- products: list diff --git a/esmvalcore/preprocessor/_regrid.py b/esmvalcore/preprocessor/_regrid.py index 4972a7ce8f..fe94744ffb 100644 --- a/esmvalcore/preprocessor/_regrid.py +++ b/esmvalcore/preprocessor/_regrid.py @@ -11,6 +11,7 @@ from pathlib import Path from typing import Dict +import dask.array as da import iris import numpy as np import stratify @@ -650,11 +651,38 @@ def regrid(cube, target_grid, scheme, lat_offset=True, lon_offset=True): # scheme is a function f(src_cube, grid_cube) -> Cube cube = loaded_scheme else: + cube = _rechunk(cube, target_grid) cube = cube.regrid(target_grid, loaded_scheme) return cube +def _rechunk(cube, target_grid): + """Re-chunk cube with optimal chunk sizes for target grid.""" + if not cube.has_lazy_data() or cube.ndim < 3: + # Only rechunk lazy multidimensional data + return cube + + if 2 * np.prod(cube.shape[-2:]) > np.prod(target_grid.shape): + # Only rechunk if target grid is more than a factor of 2 larger, + # because rechunking will keep the original chunk in memory. + return cube + + data = cube.lazy_data() + + # Compute a good chunk size for the target array + tgt_shape = data.shape[:-2] + target_grid.shape + tgt_chunks = data.chunks[:-2] + target_grid.shape + tgt_data = da.empty(tgt_shape, dtype=data.dtype, chunks=tgt_chunks) + tgt_data = tgt_data.rechunk({i: "auto" for i in range(cube.ndim - 2)}) + + # Adjust chunks to source array and rechunk + chunks = tgt_data.chunks[:-2] + data.shape[-2:] + cube.data = data.rechunk(chunks) + + return cube + + def _horizontal_grid_is_close(cube1, cube2): """Check if two cubes have the same horizontal grid definition. diff --git a/tests/sample_data/multimodel_statistics/test_multimodel.py b/tests/sample_data/multimodel_statistics/test_multimodel.py index 28b7640e6c..4c40d94875 100644 --- a/tests/sample_data/multimodel_statistics/test_multimodel.py +++ b/tests/sample_data/multimodel_statistics/test_multimodel.py @@ -23,12 +23,12 @@ SPAN_PARAMS = ('overlap', 'full') -def assert_array_almost_equal(this, other): +def assert_array_almost_equal(this, other, rtol=1e-7): """Assert that array `this` almost equals array `other`.""" if np.ma.isMaskedArray(this) or np.ma.isMaskedArray(other): np.testing.assert_array_equal(this.mask, other.mask) - np.testing.assert_allclose(this, other) + np.testing.assert_allclose(this, other, rtol=rtol) def assert_coords_equal(this: list, other: list): @@ -188,7 +188,7 @@ def multimodel_regression_test(cubes, span, name): if filename.exists(): reference_cube = iris.load_cube(str(filename)) - assert_array_almost_equal(result_cube.data, reference_cube.data) + assert_array_almost_equal(result_cube.data, reference_cube.data, 5e-7) assert_metadata_equal(result_cube.metadata, reference_cube.metadata) assert_coords_equal(result_cube.coords(), reference_cube.coords()) diff --git a/tests/unit/preprocessor/_multimodel/test_multimodel.py b/tests/unit/preprocessor/_multimodel/test_multimodel.py index 55326ab642..4a0e353e1a 100644 --- a/tests/unit/preprocessor/_multimodel/test_multimodel.py +++ b/tests/unit/preprocessor/_multimodel/test_multimodel.py @@ -340,7 +340,6 @@ def test_multimodel_statistics(frequency, span, statistics, expected): assert_array_allclose(result_cube.data, expected_data) -@pytest.mark.xfail(reason='Lazy data not (yet) supported.') @pytest.mark.parametrize('span', SPAN_OPTIONS) def test_lazy_data_consistent_times(span): """Test laziness of multimodel statistics with consistent time axis.""" @@ -362,7 +361,6 @@ def test_lazy_data_consistent_times(span): assert result_cube.has_lazy_data() -@pytest.mark.xfail(reason='Lazy data not (yet) supported.') @pytest.mark.parametrize('span', SPAN_OPTIONS) def test_lazy_data_inconsistent_times(span): """Test laziness of multimodel statistics with inconsistent time axis. diff --git a/tests/unit/preprocessor/_regrid/test_regrid.py b/tests/unit/preprocessor/_regrid/test_regrid.py index 17a3a5fc29..9a73835518 100644 --- a/tests/unit/preprocessor/_regrid/test_regrid.py +++ b/tests/unit/preprocessor/_regrid/test_regrid.py @@ -4,6 +4,8 @@ import unittest from unittest import mock +import dask +import dask.array as da import iris import numpy as np import pytest @@ -14,6 +16,7 @@ _CACHE, HORIZONTAL_SCHEMES, _horizontal_grid_is_close, + _rechunk, ) @@ -65,6 +68,7 @@ def setUp(self): regrid=self.regrid, dtype=float, ) + self.src_cube.ndim = 1 self.tgt_grid_coord = mock.Mock() self.tgt_grid = mock.Mock( spec=iris.cube.Cube, coord=self.tgt_grid_coord) @@ -262,5 +266,60 @@ def test_regrid_is_skipped_if_grids_are_the_same(): assert expected_different_cube is not cube +def test_rechunk_on_increased_grid(): + """Test that an increase in grid size rechunks.""" + with dask.config.set({'array.chunk-size': '128 M'}): + + time_dim = 246 + src_grid_dims = (91, 180) + data = da.empty((time_dim, ) + src_grid_dims, dtype=np.float32) + + tgt_grid_dims = (361, 720) + tgt_grid = da.empty(tgt_grid_dims, dtype=np.float32) + + result = _rechunk(iris.cube.Cube(data), iris.cube.Cube(tgt_grid)) + + assert result.core_data().chunks == ((123, 123), (91, ), (180, )) + + +def test_no_rechunk_on_decreased_grid(): + """Test that a decrease in grid size does not rechunk.""" + with dask.config.set({'array.chunk-size': '128 M'}): + + time_dim = 200 + src_grid_dims = (361, 720) + data = da.empty((time_dim, ) + src_grid_dims, dtype=np.float32) + + tgt_grid_dims = (91, 180) + tgt_grid = da.empty(tgt_grid_dims, dtype=np.float32) + + result = _rechunk(iris.cube.Cube(data), iris.cube.Cube(tgt_grid)) + + assert result.core_data().chunks == data.chunks + + +def test_no_rechunk_2d(): + """Test that a 2D cube is not rechunked.""" + with dask.config.set({'array.chunk-size': '64 MiB'}): + + src_grid_dims = (361, 720) + data = da.empty(src_grid_dims, dtype=np.float32) + + tgt_grid_dims = (3601, 7200) + tgt_grid = da.empty(tgt_grid_dims, dtype=np.float32) + + result = _rechunk(iris.cube.Cube(data), iris.cube.Cube(tgt_grid)) + + assert result.core_data().chunks == data.chunks + + +def test_no_rechunk_non_lazy(): + """Test that a cube with non-lazy data does not crash.""" + cube = iris.cube.Cube(np.arange(2 * 4).reshape([1, 2, 4])) + tgt_cube = iris.cube.Cube(np.arange(4 * 8).reshape([4, 8])) + result = _rechunk(cube, tgt_cube) + assert result.data is cube.data + + if __name__ == '__main__': unittest.main()