diff --git a/doc/recipe/preprocessor.rst b/doc/recipe/preprocessor.rst index e47284dda5..8fe97ae41a 100644 --- a/doc/recipe/preprocessor.rst +++ b/doc/recipe/preprocessor.rst @@ -15,6 +15,7 @@ roughly following the default order in which preprocessor functions are applied: * :ref:`Land/Sea/Ice masking` * :ref:`Horizontal regridding` * :ref:`Masking of missing values` +* :ref:`Ensemble statistics` * :ref:`Multi-model statistics` * :ref:`Time operations` * :ref:`Area operations` @@ -864,6 +865,65 @@ See also :func:`esmvalcore.preprocessor.regrid` for resolutions of ``< 0.5`` degrees the regridding becomes very slow and will use a lot of memory. +.. _ensemble statistics: + +Ensemble statistics +=================== +For certain use cases it may be desirable to compute ensemble statistics. For +example to prevent models with many ensemble members getting excessive weight in +the multi-model statistics functions. + +Theoretically, ensemble statistics are a special case (grouped) multi-model +statistics. This grouping is performed taking into account the dataset tags +`project`, `dataset`, `experiment`, and (if present) `sub_experiment`. +However, they should typically be computed earlier in the workflow. +Moreover, because multiple ensemble members of the same model are typically more +consistent/homogeneous than datasets from different models, the implementation +is more straigtforward and can benefit from lazy evaluation and more efficient +computation. + +The preprocessor takes a list of statistics as input: + +.. code-block:: yaml + + preprocessors: + example_preprocessor: + ensemble_statistics: + statistics: [mean, median] + +This preprocessor function exposes the iris analysis package, and works with all +(capitalized) statistics from the :mod:`iris.analysis` package +that can be executed without additional arguments (e.g. percentiles are not +supported because it requires additional keywords: percentile.). + +Note that ``ensemble_statistics`` will not return the single model and ensemble files, +only the requested ensemble statistics results. + +In case of wanting to save both individual ensemble members as well as the statistic results, +the preprocessor chains could be defined as: + +.. code-block:: yaml + + preprocessors: + everything_else: &everything_else + area_statistics: ... + regrid_time: ... + multimodel: + <<: *everything_else + ensemble_statistics: + + variables: + tas_datasets: + short_name: tas + preprocessor: everything_else + ... + tas_multimodel: + short_name: tas + preprocessor: multimodel + ... + + +See also :func:`esmvalcore.preprocessor.ensemble_statistics`. .. _multi-model statistics: @@ -893,32 +953,78 @@ across overlapping times only (``span: overlap``) or across the full time span of the combined models (``span: full``). The preprocessor sets a common time coordinate on all datasets. As the number of days in a year may vary between calendars, (sub-)daily data with different calendars are not supported. +The preprocessor saves both the input single model files as well as the multi-model +results. In case you do not want to keep the single model files, set the +parameter ``keep_input_datasets`` to ``false`` (default value is ``true``). + +.. code-block:: yaml + + preprocessors: + multi_model_save_input: + multi_model_statistics: + span: overlap + statistics: [mean, median] + exclude: [NCEP] + multi_model_without_saving_input: + multi_model_statistics: + span: overlap + statistics: [mean, median] + exclude: [NCEP] + keep_input_datasets: false Input datasets may have different time coordinates. The multi-model statistics preprocessor sets a common time coordinate on all datasets. As the number of days in a year may vary between calendars, (sub-)daily data are not supported. +Multi-model statistics also supports a ``groupby`` argument. You can group by +any dataset key (``project``, ``experiment``, etc.) or a combination of keys in a list. You can +also add an arbitrary tag to a dataset definition and then group by that tag. When +using this preprocessor in conjunction with `ensemble statistics`_ preprocessor, you +can group by ``ensemble_statistics`` as well. For example: + .. code-block:: yaml + datasets: + - {dataset: CanESM2, exp: historical, ensemble: "r(1:2)i1p1"} + - {dataset: CCSM4, exp: historical, ensemble: "r(1:2)i1p1"} + preprocessors: - multi_model_preprocessor: + example_preprocessor: + ensemble_statistics: + statistics: [median, mean] multi_model_statistics: span: overlap - statistics: [mean, median] + statistics: [min, max] + groupby: [ensemble_statistics] exclude: [NCEP] -see also :func:`esmvalcore.preprocessor.multi_model_statistics`. +This will first compute ensemble mean and median, and then compute the multi-model +min and max separately for the ensemble means and medians. Note that this combination +will not save the individual ensemble members, only the ensemble and multimodel statistics results. -When calling the module inside diagnostic scripts, the input must be given -as a list of cubes. The output will be saved in a dictionary where each -entry contains the resulting cube with the requested statistic operations. +When grouping by a tag not defined in all datasets, the datasets missing the tag will +be grouped together. In the example below, datasets `UKESM` and `ERA5` would belong to the same +group, while the other datasets would belong to either ``group1`` or ``group2`` -.. code-block:: +.. code-block:: yaml + + datasets: + - {dataset: CanESM2, exp: historical, ensemble: "r(1:2)i1p1", tag: 'group1'} + - {dataset: CanESM5, exp: historical, ensemble: "r(1:2)i1p1", tag: 'group2'} + - {dataset: CCSM4, exp: historical, ensemble: "r(1:2)i1p1", tag: 'group2'} + - {dataset: UKESM, exp: historical, ensemble: "r(1:2)i1p1"} + - {dataset: ERA5} + + preprocessors: + example_preprocessor: + multi_model_statistics: + span: overlap + statistics: [min, max] + groupby: [tag] - from esmvalcore.preprocessor import multi_model_statistics - statistics = multi_model_statistics([cube1,...,cubeN], 'overlap', ['mean', 'median']) - mean_cube = statistics['mean'] - median_cube = statistics['median'] +Note that those datasets can be excluded if listed in the ``exclude`` option. + +See also :func:`esmvalcore.preprocessor.multi_model_statistics`. .. note:: @@ -1481,7 +1587,7 @@ Parameters: be an array of floating point values. * ``scheme``: interpolation scheme: either ``'linear'`` or ``'nearest'``. There is no default. - + See also :func:`esmvalcore.preprocessor.extract_point`. diff --git a/esmvalcore/_data_finder.py b/esmvalcore/_data_finder.py index dcf0e3d83f..ee9b790571 100644 --- a/esmvalcore/_data_finder.py +++ b/esmvalcore/_data_finder.py @@ -1,5 +1,4 @@ """Data finder module for the ESMValTool.""" -import copy import glob import logging import os @@ -484,17 +483,33 @@ def get_output_file(variable, preproc_dir): return outfile -def get_statistic_output_file(variable, preproc_dir): - """Get multi model statistic filename depending on settings.""" - updated_var = copy.deepcopy(variable) - updated_var['timerange'] = updated_var['timerange'].replace('/', '-') - template = os.path.join( +def get_multiproduct_filename(attributes, preproc_dir): + """Get ensemble/multi-model filename depending on settings.""" + relevant_keys = [ + 'project', 'dataset', 'exp', 'ensemble_statistics', + 'multi_model_statistics', 'mip', 'short_name' + ] + + filename_segments = [] + for key in relevant_keys: + if key in attributes: + attribute = attributes[key] + if isinstance(attribute, (list, tuple)): + attribute = '-'.join(attribute) + filename_segments.extend(attribute.split('_')) + + # Remove duplicate segments: + filename_segments = list(dict.fromkeys(filename_segments)) + + # Add period and extension + filename_segments.append( + f"{attributes['timerange'].replace('/', '-')}.nc") + + outfile = os.path.join( preproc_dir, - '{diagnostic}', - '{variable_group}', - '{dataset}_{mip}_{short_name}_{timerange}.nc', + attributes['diagnostic'], + attributes['variable_group'], + '_'.join(filename_segments), ) - outfile = template.format(**updated_var) - return outfile diff --git a/esmvalcore/_provenance.py b/esmvalcore/_provenance.py index 7320e09ebf..2aa1ff1860 100644 --- a/esmvalcore/_provenance.py +++ b/esmvalcore/_provenance.py @@ -26,8 +26,8 @@ def get_esmvaltool_provenance(): namespace = 'software' create_namespace(provenance, namespace) attributes = {} # TODO: add dependencies with versions here - activity = provenance.activity( - namespace + ':esmvaltool==' + __version__, other_attributes=attributes) + activity = provenance.activity(namespace + ':esmvaltool==' + __version__, + other_attributes=attributes) return activity @@ -71,8 +71,7 @@ def get_recipe_provenance(documentation, filename): entity = provenance.entity( 'recipe:{}'.format(filename), { 'attribute:description': documentation.get('description', ''), - 'attribute:references': str( - documentation.get('references', [])), + 'attribute:references': str(documentation.get('references', [])), }) attribute_to_authors(entity, documentation.get('authors', [])) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index d5f37f4b8f..f362b74b65 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -4,6 +4,7 @@ import os import re import warnings +from collections import defaultdict from copy import deepcopy from pathlib import Path from pprint import pformat @@ -28,9 +29,9 @@ _parse_period, _truncate_dates, get_input_filelist, + get_multiproduct_filename, get_output_file, get_start_end_date, - get_statistic_output_file, ) from ._provenance import TrackedFile, get_recipe_provenance from ._task import DiagnosticTask, ResumeTask, TaskSet @@ -47,6 +48,7 @@ ) from .preprocessor._derive import get_required from .preprocessor._io import DATASET_KEYS, concatenate_callback +from .preprocessor._other import _group_products from .preprocessor._regrid import ( _spec_to_latlonvals, get_cmor_levels, @@ -654,8 +656,8 @@ def _apply_preprocessor_profile(settings, profile_settings): settings[step].update(args) -def _get_statistic_attributes(products): - """Get attributes for the statistic output products.""" +def _get_common_attributes(products): + """Get common attributes for the output products.""" attributes = {} some_product = next(iter(products)) for key, value in some_product.attributes.items(): @@ -682,8 +684,8 @@ def _get_statistic_attributes(products): return attributes -def _get_remaining_common_settings(step, order, products): - """Get preprocessor settings that are shared between products.""" +def _get_downstream_settings(step, order, products): + """Get downstream preprocessor settings shared between products.""" settings = {} remaining_steps = order[order.index(step) + 1:] some_product = next(iter(products)) @@ -703,31 +705,75 @@ def _update_multi_dataset_settings(variable, settings): _exclude_dataset(settings, variable, step) -def _update_statistic_settings(products, order, preproc_dir): - """Define statistic output products.""" - # TODO: move this to multi model statistics function? - # But how to check, with a dry-run option? - step = 'multi_model_statistics' +def _get_tag(step, identifier, statistic): + # Avoid . in filename for percentiles + statistic = statistic.replace('.', '-') - products = {p for p in products if step in p.settings} + if step == 'ensemble_statistics': + tag = 'Ensemble' + statistic.title() + elif identifier == '': + tag = 'MultiModel' + statistic.title() + else: + tag = identifier + statistic.title() + + return tag + + +def _update_multiproduct(input_products, order, preproc_dir, step): + """Return new products that are aggregated over multiple datasets. + + These new products will replace the original products at runtime. + Therefore, they need to have all the settings for the remaining steps. + + The functions in _multimodel.py take output_products as function arguments. + These are the output_products created here. But since those functions are + called from the input products, the products that are created here need to + be added to their ancestors products' settings (). + """ + products = {p for p in input_products if step in p.settings} if not products: - return + return input_products, {} - some_product = next(iter(products)) - for statistic in some_product.settings[step]['statistics']: - check.valid_multimodel_statistic(statistic) - attributes = _get_statistic_attributes(products) - attributes['dataset'] = attributes['alias'] = 'MultiModel{}'.format( - statistic.title().replace('.', '-')) - attributes['filename'] = get_statistic_output_file( - attributes, preproc_dir) - common_settings = _get_remaining_common_settings(step, order, products) - statistic_product = PreprocessorFile(attributes, common_settings) - for product in products: + settings = list(products)[0].settings[step] + + if step == 'ensemble_statistics': + check.ensemble_statistics_preproc(settings) + grouping = ['project', 'dataset', 'exp', 'sub_experiment'] + else: + check.multimodel_statistics_preproc(settings) + grouping = settings.get('groupby', None) + + downstream_settings = _get_downstream_settings(step, order, products) + + relevant_settings = { + 'output_products': defaultdict(dict) + } # pass to ancestors + + output_products = set() + for identifier, products in _group_products(products, by_key=grouping): + common_attributes = _get_common_attributes(products) + + for statistic in settings.get('statistics'): + common_attributes[step] = _get_tag(step, identifier, statistic) + filename = get_multiproduct_filename(common_attributes, + preproc_dir) + common_attributes['filename'] = filename + statistic_product = PreprocessorFile(common_attributes, + downstream_settings) + output_products.add(statistic_product) + relevant_settings['output_products'][identifier][ + statistic] = statistic_product + + return output_products, relevant_settings + + +def update_ancestors(ancestors, step, downstream_settings): + """Retroactively add settings to ancestor products.""" + for product in ancestors: + if step in product.settings: settings = product.settings[step] - if 'output_products' not in settings: - settings['output_products'] = {} - settings['output_products'][statistic] = statistic_product + for key, value in downstream_settings.items(): + settings[key] = value def _update_extract_shape(settings, config_user): @@ -791,31 +837,36 @@ def _update_timerange(variable, config_user): def _match_products(products, variables): """Match a list of input products to output product attributes.""" - grouped_products = {} + grouped_products = defaultdict(list) + + if not products: + return grouped_products def get_matching(attributes): """Find the output filename which matches input attributes best.""" - score = 0 + best_score = 0 filenames = [] for variable in variables: filename = variable['filename'] - tmp = sum(v == variable.get(k) for k, v in attributes.items()) - if tmp > score: - score = tmp + score = sum(v == variable.get(k) for k, v in attributes.items()) + + if score > best_score: + best_score = score filenames = [filename] - elif tmp == score: + elif score == best_score: filenames.append(filename) + if not filenames: logger.warning( "Unable to find matching output file for input file %s", filename) + return filenames # Group input files by output file for product in products: - for filename in get_matching(product.attributes): - if filename not in grouped_products: - grouped_products[filename] = [] + matching_filenames = get_matching(product.attributes) + for filename in matching_filenames: grouped_products[filename].append(product) return grouped_products @@ -839,6 +890,8 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, sets the correct ancestry. """ products = set() + preproc_dir = config_user['preproc_dir'] + for variable in variables: _update_timerange(variable, config_user) variable['filename'] = get_output_file(variable, @@ -848,6 +901,7 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, grouped_ancestors = _match_products(ancestor_products, variables) else: grouped_ancestors = {} + missing_vars = set() for variable in variables: settings = _get_default_settings( @@ -883,6 +937,7 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, settings=settings, ancestors=ancestors, ) + products.add(product) if missing_vars: @@ -891,10 +946,46 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, f'Missing data for preprocessor {name}:{separator}' f'{separator.join(sorted(missing_vars))}') - _update_statistic_settings(products, order, config_user['preproc_dir']) check.reference_for_bias_preproc(products) - for product in products: + ensemble_step = 'ensemble_statistics' + multi_model_step = 'multi_model_statistics' + if ensemble_step in profile: + ensemble_products, ensemble_settings = _update_multiproduct( + products, order, preproc_dir, ensemble_step) + + # check for ensemble_settings to bypass tests + update_ancestors( + ancestors=products, + step=ensemble_step, + downstream_settings=ensemble_settings, + ) + else: + ensemble_products = products + + if multi_model_step in profile: + multimodel_products, multimodel_settings = _update_multiproduct( + ensemble_products, order, preproc_dir, multi_model_step) + + # check for multi_model_settings to bypass tests + update_ancestors( + ancestors=products, + step=multi_model_step, + downstream_settings=multimodel_settings, + ) + + if ensemble_step in profile: + # Update multi-product settings (workaround for lack of better + # ancestry tracking) + update_ancestors( + ancestors=ensemble_products, + step=multi_model_step, + downstream_settings=multimodel_settings, + ) + else: + multimodel_products = set() + + for product in products | multimodel_products | ensemble_products: product.check() return products diff --git a/esmvalcore/_recipe_checks.py b/esmvalcore/_recipe_checks.py index 81459cd0f9..2597b10874 100644 --- a/esmvalcore/_recipe_checks.py +++ b/esmvalcore/_recipe_checks.py @@ -13,7 +13,6 @@ from ._data_finder import get_start_end_year from .exceptions import InputFilesNotFound, RecipeError from .preprocessor import TIME_PREPROCESSORS, PreprocessingTask -from .preprocessor._multimodel import STATISTIC_MAPPING logger = logging.getLogger(__name__) @@ -220,16 +219,89 @@ def extract_shape(settings): "{}".format(', '.join(f"'{k}'".lower() for k in valid[key]))) -def valid_multimodel_statistic(statistic): - """Check that `statistic` is a valid argument for multimodel stats.""" - valid_names = ['std'] + list(STATISTIC_MAPPING.keys()) +def _verify_statistics(statistics, step): + """Raise error if multi-model statistics cannot be verified.""" + valid_names = ["mean", "median", "std", "min", "max"] valid_patterns = [r"^(p\d{1,2})(\.\d*)?$"] - if not (statistic in valid_names - or re.match(r'|'.join(valid_patterns), statistic)): + + for statistic in statistics: + if not (statistic in valid_names + or re.match(r'|'.join(valid_patterns), statistic)): + raise RecipeError( + "Invalid value encountered for `statistic` in preprocessor " + f"{step}. Valid values are {valid_names} " + f"or patterns matching {valid_patterns}. Got '{statistic}'.") + + +def _verify_span_value(span): + """Raise error if span argument cannot be verified.""" + valid_names = ('overlap', 'full') + if span not in valid_names: + raise RecipeError( + "Invalid value encountered for `span` in preprocessor " + f"`multi_model_statistics`. Valid values are {valid_names}." + f"Got {span}.") + + +def _verify_groupby(groupby): + """Raise error if groupby arguments cannot be verified.""" + if not isinstance(groupby, list): + raise RecipeError( + "Invalid value encountered for `groupby` in preprocessor " + "`multi_model_statistics`.`groupby` must be defined as a " + f"list. Got {groupby}.") + + +def _verify_keep_input_datasets(keep_input_datasets): + if not isinstance(keep_input_datasets, bool): raise RecipeError( - "Invalid value encountered for `statistic` in preprocessor " - f"`multi_model_statistics`. Valid values are {valid_names} " - f"or patterns matching {valid_patterns}. Got '{statistic}.'") + "Invalid value encountered for `keep_input_datasets`." + f"Must be defined as a boolean. Got {keep_input_datasets}." + ) + + +def _verify_arguments(given, expected): + """Raise error if arguments cannot be verified.""" + for key in given: + if key not in expected: + raise RecipeError( + f"Unexpected keyword argument encountered: {key}. Valid " + f"keywords are: {expected}.") + + +def multimodel_statistics_preproc(settings): + """Check that the multi-model settings are valid.""" + valid_keys = ['span', 'groupby', 'statistics', 'keep_input_datasets'] + _verify_arguments(settings.keys(), valid_keys) + + span = settings.get('span', None) # optional, default: overlap + if span: + _verify_span_value(span) + + groupby = settings.get('groupby', None) # optional, default: None + if groupby: + _verify_groupby(groupby) + + statistics = settings.get('statistics', None) # required + if statistics: + _verify_statistics(statistics, 'multi_model_statistics') + + keep_input_datasets = settings.get('keep_input_datasets', True) + _verify_keep_input_datasets(keep_input_datasets) + + +def ensemble_statistics_preproc(settings): + """Check that the ensemble settings are valid.""" + valid_keys = ['statistics', 'span'] + _verify_arguments(settings.keys(), valid_keys) + + span = settings.get('span', 'overlap') # optional, default: overlap + if span: + _verify_span_value(span) + + statistics = settings.get('statistics', None) + if statistics: + _verify_statistics(statistics, 'ensemble_statistics') def _check_delimiter(timerange): diff --git a/esmvalcore/preprocessor/__init__.py b/esmvalcore/preprocessor/__init__.py index b40ab4b536..b347337d8f 100644 --- a/esmvalcore/preprocessor/__init__.py +++ b/esmvalcore/preprocessor/__init__.py @@ -42,7 +42,7 @@ mask_multimodel, mask_outside_range, ) -from ._multimodel import multi_model_statistics +from ._multimodel import ensemble_statistics, multi_model_statistics from ._other import clip from ._regrid import extract_levels, extract_location, extract_point, regrid from ._time import ( @@ -159,6 +159,8 @@ 'linear_trend_stderr', # Convert units 'convert_units', + # Ensemble statistics + 'ensemble_statistics', # Multi model statistics 'multi_model_statistics', # Bias calculation @@ -196,6 +198,7 @@ MULTI_MODEL_FUNCTIONS = { 'bias', + 'ensemble_statistics', 'multi_model_statistics', 'mask_multimodel', 'mask_fillvalues', @@ -372,8 +375,7 @@ class PreprocessorFile(TrackedFile): """Preprocessor output file.""" def __init__(self, attributes, settings, ancestors=None): - super(PreprocessorFile, self).__init__(attributes['filename'], - attributes, ancestors) + super().__init__(attributes['filename'], attributes, ancestors) self.settings = copy.deepcopy(settings) if 'save' not in self.settings: @@ -461,13 +463,35 @@ def is_closed(self): def _initialize_entity(self): """Initialize the entity representing the file.""" - super(PreprocessorFile, self)._initialize_entity() + super()._initialize_entity() settings = { 'preprocessor:' + k: str(v) for k, v in self.settings.items() } self.entity.add_attributes(settings) + def group(self, keys: list) -> str: + """Generate group keyword. + + Returns a string that identifies a group. Concatenates a list of + values from .attributes + """ + if not keys: + return '' + + if isinstance(keys, str): + keys = [keys] + + identifier = [] + for key in keys: + attribute = self.attributes.get(key) + if attribute: + if isinstance(attribute, (list, tuple)): + attribute = '-'.join(attribute) + identifier.append(attribute) + + return '_'.join(identifier) + # TODO: use a custom ProductSet that raises an exception if you try to # add the same Product twice @@ -513,17 +537,44 @@ def __init__( def _initialize_product_provenance(self): """Initialize product provenance.""" - for product in self.products: - product.initialize_provenance(self.activity) + self._initialize_products(self.products) + self._initialize_multimodel_provenance() + self._initialize_ensemble_provenance() - # Hacky way to initialize the multi model products as well. - step = 'multi_model_statistics' - input_products = [p for p in self.products if step in p.settings] + def _initialize_multiproduct_provenance(self, step): + input_products = self._get_input_products(step) if input_products: - statistic_products = input_products[0].settings[step].get( - 'output_products', {}).values() - for product in statistic_products: - product.initialize_provenance(self.activity) + statistic_products = set() + + for input_product in input_products: + step_settings = input_product.settings[step] + output_products = step_settings.get('output_products', {}) + + for product in output_products.values(): + statistic_products.update(product.values()) + + self._initialize_products(statistic_products) + + def _initialize_multimodel_provenance(self): + """Initialize provenance for multi-model statistics.""" + step = 'multi_model_statistics' + self._initialize_multiproduct_provenance(step) + + def _initialize_ensemble_provenance(self): + """Initialize provenance for ensemble statistics.""" + step = 'ensemble_statistics' + self._initialize_multiproduct_provenance(step) + + def _get_input_products(self, step): + """Get input products.""" + return [ + product for product in self.products if step in product.settings + ] + + def _initialize_products(self, products): + """Initialize products.""" + for product in products: + product.initialize_provenance(self.activity) def _run(self, _): """Run the preprocessor.""" diff --git a/esmvalcore/preprocessor/_multimodel.py b/esmvalcore/preprocessor/_multimodel.py index 0fbd64ca7a..800a035ab0 100644 --- a/esmvalcore/preprocessor/_multimodel.py +++ b/esmvalcore/preprocessor/_multimodel.py @@ -1,5 +1,12 @@ -"""Functions to compute multi-cube statistics.""" +"""Statistics across cubes. +This module contains functions to compute statistics across multiple cubes or +products. + +Wrapper functions separate esmvalcore internals, operating on products, from +generalized functions that operate on iris cubes. These wrappers support +grouped execution by passing a groupby keyword. +""" import logging import re import warnings @@ -14,6 +21,8 @@ from esmvalcore.iris_helpers import date2num from esmvalcore.preprocessor import remove_fx_variables +from ._other import _group_products + logger = logging.getLogger(__name__) STATISTIC_MAPPING = { @@ -80,13 +89,13 @@ def _get_consistent_time_unit(cubes): def _unify_time_coordinates(cubes): """Make sure all cubes' share the same time coordinate. - This function extracts the date information from the cube and - reconstructs the time coordinate, resetting the actual dates to the - 15th of the month or 1st of july for yearly data (consistent with - `regrid_time`), so that there are no mismatches in the time arrays. + This function extracts the date information from the cube and reconstructs + the time coordinate, resetting the actual dates to the 15th of the month or + 1st of July for yearly data (consistent with `regrid_time`), so that there + are no mismatches in the time arrays. - If cubes have different time units, it will use reset the calendar to - a default gregorian calendar with unit "days since 1850-01-01". + If cubes have different time units, it will reset the calendar to a + default gregorian calendar with unit "days since 1850-01-01". Might not work for (sub)daily data, because different calendars may have different number of days in the year. @@ -280,6 +289,15 @@ def _compute_eager(cubes: list, *, operator: iris.analysis.Aggregator, result_cube.data = np.ma.array(result_cube.data) result_cube.remove_coord(CONCAT_DIM) + if result_cube.cell_methods: + cell_method = result_cube.cell_methods[0] + result_cube.cell_methods = None + updated_method = iris.coords.CellMethod( + method=cell_method.method, + coords=cell_method.coord_names, + intervals=cell_method.intervals, + comments=f'input_cubes: {len(cubes)}') + result_cube.add_cell_method(updated_method) return result_cube @@ -306,7 +324,6 @@ def _multicube_statistics(cubes, statistics, span): result_cube = _compute_eager(aligned_cubes, operator=operator, **kwargs) - statistics_cubes[statistic] = result_cube return statistics_cubes @@ -347,6 +364,7 @@ def multi_model_statistics(products, span, statistics, output_products=None, + groupby=None, keep_input_datasets=True): """Compute multi-model statistics. @@ -376,18 +394,21 @@ def multi_model_statistics(products, ---------- products: list Cubes (or products) over which the statistics will be computed. - statistics: list - Statistical metrics to be computed, e.g. [``mean``, ``max``]. Choose - from the operators listed in the iris.analysis package. Percentiles can - be specified like ``pXX.YY``. span: str Overlap or full; if overlap, statitstics are computed on common time- span; if full, statistics are computed on full time spans, ignoring missing data. + statistics: list + Statistical metrics to be computed, e.g. [``mean``, ``max``]. Choose + from the operators listed in the iris.analysis package. Percentiles can + be specified like ``pXX.YY``. output_products: dict For internal use only. A dict with statistics names as keys and preprocessorfiles as values. If products are passed as input, the statistics cubes will be assigned to these output products. + groupby: tuple + Group products by a given tag or attribute, e.g. + ('project', 'dataset', 'tag1'). keep_input_datasets: bool If True, the output will include the input datasets. If False, only the computed statistics will be returned. @@ -412,14 +433,70 @@ def multi_model_statistics(products, ) if all(type(p).__name__ == 'PreprocessorFile' for p in products): # Avoid circular input: https://stackoverflow.com/q/16964467 - return _multiproduct_statistics( - products=products, - statistics=statistics, - output_products=output_products, - span=span, - keep_input_datasets=keep_input_datasets, - ) + statistics_products = set() + for group, input_prods in _group_products(products, by_key=groupby): + sub_output_products = output_products[group] + + # Compute statistics on a single group + group_statistics = _multiproduct_statistics( + products=input_prods, + statistics=statistics, + output_products=sub_output_products, + span=span, + keep_input_datasets=keep_input_datasets + ) + + statistics_products |= group_statistics + + return statistics_products raise ValueError( "Input type for multi_model_statistics not understood. Expected " "iris.cube.Cube or esmvalcore.preprocessor.PreprocessorFile, " "got {}".format(products)) + + +def ensemble_statistics(products, statistics, + output_products, span='overlap'): + """Entry point for ensemble statistics. + + An ensemble grouping is performed on the input products. + The statistics are then computed calling + the :func:`esmvalcore.preprocessor.multi_model_statistics` module, + taking the grouped products as an input. + + Parameters + ---------- + products: list + Cubes (or products) over which the statistics will be computed. + statistics: list + Statistical metrics to be computed, e.g. [``mean``, ``max``]. Choose + from the operators listed in the iris.analysis package. Percentiles can + be specified like ``pXX.YY``. + output_products: dict + For internal use only. A dict with statistics names as keys and + preprocessorfiles as values. If products are passed as input, the + statistics cubes will be assigned to these output products. + span: str (default: 'overlap') + Overlap or full; if overlap, statitstics are computed on common time- + span; if full, statistics are computed on full time spans, ignoring + missing data. + + Returns + ------- + set + A set of output_products with the resulting ensemble statistics. + + See Also + -------- + :func:`esmvalcore.preprocessor.multi_model_statistics` for + the full description of the core statistics function. + """ + ensemble_grouping = ('project', 'dataset', 'exp', 'sub_experiment') + return multi_model_statistics( + products=products, + span=span, + statistics=statistics, + output_products=output_products, + groupby=ensemble_grouping, + keep_input_datasets=False + ) diff --git a/esmvalcore/preprocessor/_other.py b/esmvalcore/preprocessor/_other.py index 697e8b3347..36bbccafb6 100644 --- a/esmvalcore/preprocessor/_other.py +++ b/esmvalcore/preprocessor/_other.py @@ -1,8 +1,7 @@ -""" -Preprocessor functions that do not fit into any of the categories. -""" +"""Preprocessor functions that do not fit into any of the categories.""" import logging +from collections import defaultdict import dask.array as da @@ -10,8 +9,7 @@ def clip(cube, minimum=None, maximum=None): - """ - Clip values at a specified minimum and/or maximum value + """Clip values at a specified minimum and/or maximum value. Values lower than minimum are set to minimum and values higher than maximum are set to maximum. @@ -38,3 +36,38 @@ def clip(cube, minimum=None, maximum=None): raise ValueError("Maximum should be equal or larger than minimum.") cube.data = da.clip(cube.core_data(), minimum, maximum) return cube + + +def _groupby(iterable, keyfunc): + """Group iterable by key function. + + The items are grouped by the value that is returned by the `keyfunc` + + Parameters + ---------- + iterable : list, tuple or iterable + List of items to group + keyfunc : callable + Used to determine the group of each item. These become the keys + of the returned dictionary + + Returns + ------- + dict + Returns a dictionary with the grouped values. + """ + grouped = defaultdict(set) + for item in iterable: + key = keyfunc(item) + grouped[key].add(item) + + return grouped + + +def _group_products(products, by_key): + """Group products by the given list of attributes.""" + def grouper(product): + return product.group(by_key) + + grouped = _groupby(products, keyfunc=grouper) + return grouped.items() diff --git a/setup.cfg b/setup.cfg index 6b23e7e249..c4ad3cb458 100644 --- a/setup.cfg +++ b/setup.cfg @@ -14,7 +14,6 @@ addopts = --cov-report=xml:test-reports/coverage.xml --cov-report=html:test-reports/coverage_html --html=test-reports/report.html - --numprocesses auto env = MPLBACKEND = Agg flake8-ignore = diff --git a/tests/__init__.py b/tests/__init__.py index 55d5f563a8..41d3e3f830 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -79,5 +79,6 @@ def __init__(self, cubes, filename, attributes, **kwargs): self.cubes = cubes self.filename = filename self.attributes = attributes + self.settings = {} self.mock_ancestors = set() self.wasderivedfrom = mock.Mock(side_effect=self.mock_ancestors.add) diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index d4c03ea1e6..c82369698d 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -1,4 +1,5 @@ import os +from collections import defaultdict from copy import deepcopy from pathlib import Path from pprint import pformat @@ -238,7 +239,6 @@ def _get_default_settings_for_chl(fix_dir, save_filename, preprocessor): @pytest.fixture def patched_tas_derivation(monkeypatch): - def get_required(short_name, _): if short_name != 'tas': assert False @@ -1842,6 +1842,242 @@ def test_extract_shape_raises(tmp_path, patched_datafinder, config_user, assert invalid_arg in exc.value.failed_tasks[0].message +def _test_output_product_consistency(products, preprocessor, statistics): + product_out = defaultdict(list) + + for i, product in enumerate(products): + settings = product.settings.get(preprocessor) + if settings: + output_products = settings['output_products'] + + for identifier, statistic_out in output_products.items(): + for statistic, preproc_file in statistic_out.items(): + product_out[identifier, statistic].append(preproc_file) + + # Make sure that output products are consistent + for (identifier, statistic), value in product_out.items(): + assert statistic in statistics + assert len(set(value)) == 1, 'Output products are not equal' + + return product_out + + +def test_ensemble_statistics(tmp_path, patched_datafinder, config_user): + statistics = ['mean', 'max'] + diagnostic = 'diagnostic_name' + variable = 'pr' + preprocessor = 'ensemble_statistics' + + content = dedent(f""" + preprocessors: + default: &default + custom_order: true + area_statistics: + operator: mean + {preprocessor}: + statistics: {statistics} + + diagnostics: + {diagnostic}: + variables: + {variable}: + project: CMIP5 + mip: Amon + start_year: 2000 + end_year: 2002 + preprocessor: default + additional_datasets: + - {{dataset: CanESM2, exp: [historical, rcp45], + ensemble: "r(1:2)i1p1"}} + - {{dataset: CCSM4, exp: [historical, rcp45], + ensemble: "r(1:2)i1p1"}} + scripts: null + """) + + recipe = get_recipe(tmp_path, content, config_user) + variable = recipe.diagnostics[diagnostic]['preprocessor_output'][variable] + datasets = set([var['dataset'] for var in variable]) + task = next(iter(recipe.tasks)) + + products = task.products + product_out = _test_output_product_consistency(products, preprocessor, + statistics) + + assert len(product_out) == len(datasets) * len(statistics) + + task._initialize_product_provenance() + assert next(iter(products)).provenance is not None + + +def test_multi_model_statistics(tmp_path, patched_datafinder, config_user): + statistics = ['mean', 'max'] + diagnostic = 'diagnostic_name' + variable = 'pr' + preprocessor = 'multi_model_statistics' + + content = dedent(f""" + preprocessors: + default: &default + custom_order: true + area_statistics: + operator: mean + {preprocessor}: + span: overlap + statistics: {statistics} + + diagnostics: + {diagnostic}: + variables: + {variable}: + project: CMIP5 + mip: Amon + start_year: 2000 + end_year: 2002 + preprocessor: default + additional_datasets: + - {{dataset: CanESM2, exp: [historical, rcp45], + ensemble: "r(1:2)i1p1"}} + - {{dataset: CCSM4, exp: [historical, rcp45], + ensemble: "r(1:2)i1p1"}} + scripts: null + """) + + recipe = get_recipe(tmp_path, content, config_user) + variable = recipe.diagnostics[diagnostic]['preprocessor_output'][variable] + task = next(iter(recipe.tasks)) + + products = task.products + product_out = _test_output_product_consistency(products, preprocessor, + statistics) + + assert len(product_out) == len(statistics) + + task._initialize_product_provenance() + assert next(iter(products)).provenance is not None + + +def test_multi_model_statistics_exclude(tmp_path, + patched_datafinder, + config_user): + statistics = ['mean', 'max'] + diagnostic = 'diagnostic_name' + variable = 'pr' + preprocessor = 'multi_model_statistics' + + content = dedent(f""" + preprocessors: + default: &default + custom_order: true + area_statistics: + operator: mean + {preprocessor}: + span: overlap + statistics: {statistics} + groupby: ['project'] + exclude: ['TEST'] + + diagnostics: + {diagnostic}: + variables: + {variable}: + project: CMIP5 + mip: Amon + start_year: 2000 + end_year: 2002 + preprocessor: default + additional_datasets: + - {{dataset: CanESM2, exp: [historical, rcp45], + ensemble: "r(1:2)i1p1"}} + - {{dataset: CCSM4, exp: [historical, rcp45], + ensemble: "r(1:2)i1p1"}} + - {{dataset: TEST, project: OBS, type: reanaly, version: 1, + tier: 1}} + scripts: null + """) + + recipe = get_recipe(tmp_path, content, config_user) + variable = recipe.diagnostics[diagnostic]['preprocessor_output'][variable] + task = next(iter(recipe.tasks)) + + products = task.products + product_out = _test_output_product_consistency(products, preprocessor, + statistics) + + assert len(product_out) == len(statistics) + assert 'OBS' not in product_out + for id, prods in product_out: + assert id != 'OBS' + assert id == 'CMIP5' + task._initialize_product_provenance() + assert next(iter(products)).provenance is not None + + +def test_groupby_combined_statistics(tmp_path, patched_datafinder, + config_user): + diagnostic = 'diagnostic_name' + variable = 'pr' + + mm_statistics = ['mean', 'max'] + mm_preprocessor = 'multi_model_statistics' + ens_statistics = ['mean', 'median'] + ens_preprocessor = 'ensemble_statistics' + + groupby = [ens_preprocessor, 'tag'] + + content = dedent(f""" + preprocessors: + default: &default + custom_order: true + area_statistics: + operator: mean + {ens_preprocessor}: + span: 'overlap' + statistics: {ens_statistics} + {mm_preprocessor}: + span: overlap + groupby: {groupby} + statistics: {mm_statistics} + + diagnostics: + {diagnostic}: + variables: + {variable}: + project: CMIP5 + mip: Amon + start_year: 2000 + end_year: 2002 + preprocessor: default + additional_datasets: + - {{dataset: CanESM2, exp: [historical, rcp45], + ensemble: "r(1:2)i1p1", tag: group1}} + - {{dataset: CCSM4, exp: [historical, rcp45], + ensemble: "r(1:2)i1p1", tag: group2}} + scripts: null + """) + + recipe = get_recipe(tmp_path, content, config_user) + variable = recipe.diagnostics[diagnostic]['preprocessor_output'][variable] + datasets = set([var['dataset'] for var in variable]) + + products = next(iter(recipe.tasks)).products + + ens_products = _test_output_product_consistency( + products, + ens_preprocessor, + ens_statistics, + ) + + mm_products = _test_output_product_consistency( + products, + mm_preprocessor, + mm_statistics, + ) + + assert len(ens_products) == len(datasets) * len(ens_statistics) + assert len( + mm_products) == len(mm_statistics) * len(ens_statistics) * len(groupby) + + def test_weighting_landsea_fraction(tmp_path, patched_datafinder, config_user): TAGS.set_tag_values(TAGS_FOR_TESTING) diff --git a/tests/integration/test_recipe_checks.py b/tests/integration/test_recipe_checks.py index 3fc749d2ca..97b30b8c78 100644 --- a/tests/integration/test_recipe_checks.py +++ b/tests/integration/test_recipe_checks.py @@ -260,3 +260,74 @@ def test_reference_for_bias_preproc_two_refs(): assert "],\nfound 2:\n[" in str(rec_err.value) assert ("].\nPlease also ensure that the reference dataset is " "not excluded with the 'exclude' option") in str(rec_err.value) + + +INVALID_MM_SETTINGS = { + 'wrong_parametre': 'wrong', + 'statistics': ['wrong'], + 'span': 'wrong', + 'groupby': 'wrong', + 'keep_input_datasets': 'wrong' + } + + +def test_invalid_multi_model_settings(): + valid_keys = ['span', 'groupby', 'statistics', 'keep_input_datasets'] + with pytest.raises(RecipeError) as rec_err: + check._verify_arguments(INVALID_MM_SETTINGS, valid_keys) + assert str(rec_err.value) == ( + "Unexpected keyword argument encountered: wrong_parametre. " + "Valid keywords are: " + "['span', 'groupby', 'statistics', 'keep_input_datasets'].") + + +def test_invalid_multi_model_statistics(): + with pytest.raises(RecipeError) as rec_err: + check._verify_statistics( + INVALID_MM_SETTINGS['statistics'], 'multi_model_statistics') + assert str(rec_err.value) == ( + "Invalid value encountered for `statistic` in preprocessor " + "multi_model_statistics. Valid values are " + "['mean', 'median', 'std', 'min', 'max'] " + "or patterns matching ['^(p\\\\d{1,2})(\\\\.\\\\d*)?$']. " + "Got 'wrong'.") + + +def test_invalid_multi_model_span(): + with pytest.raises(RecipeError) as rec_err: + check._verify_span_value(INVALID_MM_SETTINGS['span']) + assert str(rec_err.value) == ( + "Invalid value encountered for `span` in preprocessor " + "`multi_model_statistics`. Valid values are ('overlap', 'full')." + "Got wrong." + ) + + +def test_invalid_multi_model_groupy(): + with pytest.raises(RecipeError) as rec_err: + check._verify_groupby(INVALID_MM_SETTINGS['groupby']) + assert str(rec_err.value) == ( + 'Invalid value encountered for `groupby` in preprocessor ' + '`multi_model_statistics`.`groupby` must be defined ' + 'as a list. Got wrong.' + ) + + +def test_invalid_multi_model_keep_input(): + with pytest.raises(RecipeError) as rec_err: + check._verify_keep_input_datasets( + INVALID_MM_SETTINGS['keep_input_datasets']) + assert str(rec_err.value) == ( + 'Invalid value encountered for `keep_input_datasets`.' + 'Must be defined as a boolean. Got wrong.') + + +def test_invalid_ensemble_statistics(): + with pytest.raises(RecipeError) as rec_err: + check._verify_statistics(['wrong'], 'ensemble_statistics') + assert str(rec_err.value) == ( + "Invalid value encountered for `statistic` in preprocessor " + "ensemble_statistics. Valid values are " + "['mean', 'median', 'std', 'min', 'max'] " + "or patterns matching ['^(p\\\\d{1,2})(\\\\.\\\\d*)?$']. " + "Got 'wrong'.") diff --git a/tests/unit/preprocessor/_multimodel/test_multimodel.py b/tests/unit/preprocessor/_multimodel/test_multimodel.py index 9460c0199d..7de15d6a0c 100644 --- a/tests/unit/preprocessor/_multimodel/test_multimodel.py +++ b/tests/unit/preprocessor/_multimodel/test_multimodel.py @@ -1,4 +1,4 @@ -"""Unit test for :func:`esmvalcore.preprocessor._multimodel`""" +"""Unit test for :func:`esmvalcore.preprocessor._multimodel`.""" from datetime import datetime @@ -514,13 +514,37 @@ def test_unify_time_coordinates(): class PreprocessorFile: """Mockup to test output of multimodel.""" - def __init__(self, cube=None): + def __init__(self, cube=None, attributes=None): if cube: self.cubes = [cube] + if attributes: + self.attributes = attributes def wasderivedfrom(self, product): pass + def group(self, keys: list) -> str: + """Generate group keyword. + + Returns a string that identifies a group. Concatenates a list of + values from .attributes + """ + if not keys: + return '' + + if isinstance(keys, str): + keys = [keys] + + identifier = [] + for key in keys: + attribute = self.attributes.get(key) + if attribute: + if isinstance(attribute, (list, tuple)): + attribute = '-'.join(attribute) + identifier.append(attribute) + + return '_'.join(identifier) + def test_return_products(): """Check that the right product set is returned.""" @@ -533,12 +557,12 @@ def test_return_products(): products = set([input1, input2]) output = PreprocessorFile() - output_products = {'mean': output} + output_products = {'': {'mean': output}} kwargs = { 'statistics': ['mean'], 'span': 'full', - 'output_products': output_products + 'output_products': output_products[''] } result1 = mm._multiproduct_statistics(products, @@ -552,6 +576,7 @@ def test_return_products(): assert result1 == set([input1, input2, output]) assert result2 == set([output]) + kwargs['output_products'] = output_products result3 = mm.multi_model_statistics(products, **kwargs) result4 = mm.multi_model_statistics(products, keep_input_datasets=False, @@ -561,6 +586,48 @@ def test_return_products(): assert result4 == result2 +def test_ensemble_products(): + cube1 = generate_cube_from_dates('monthly', fill_val=1) + cube2 = generate_cube_from_dates('monthly', fill_val=9) + + attributes1 = { + 'project': 'project', 'dataset': 'dataset', + 'exp': 'exp', 'ensemble': '1'} + input1 = PreprocessorFile(cube1, attributes=attributes1) + + attributes2 = { + 'project': 'project', 'dataset': 'dataset', + 'exp': 'exp', 'ensemble': '2'} + input2 = PreprocessorFile(cube2, attributes=attributes2) + + attributes3 = { + 'project': 'project', 'dataset': 'dataset2', + 'exp': 'exp', 'ensemble': '1'} + input3 = PreprocessorFile(cube1, attributes=attributes3) + + attributes4 = { + 'project': 'project', 'dataset': 'dataset2', + 'exp': 'exp', 'ensemble': '2'} + + input4 = PreprocessorFile(cube1, attributes=attributes4) + products = set([input1, input2, input3, input4]) + + output1 = PreprocessorFile() + output2 = PreprocessorFile() + output_products = { + 'project_dataset_exp': {'mean': output1}, + 'project_dataset2_exp': {'mean': output2}} + + kwargs = { + 'statistics': ['mean'], + 'output_products': output_products, + } + + result = mm.ensemble_statistics( + products, **kwargs) + assert len(result) == 2 + + def test_ignore_tas_scalar_height_coord(): """Ignore conflicting aux_coords for height in tas.""" tas_2m = generate_cube_from_dates("monthly") diff --git a/tests/unit/preprocessor/_other/test_other.py b/tests/unit/preprocessor/_other/test_other.py index 08a1ee26c0..5795c56db2 100644 --- a/tests/unit/preprocessor/_other/test_other.py +++ b/tests/unit/preprocessor/_other/test_other.py @@ -9,7 +9,8 @@ from iris.cube import Cube from numpy.testing import assert_array_equal -from esmvalcore.preprocessor._other import clip +from esmvalcore.preprocessor import PreprocessorFile +from esmvalcore.preprocessor._other import _group_products, clip class TestOther(unittest.TestCase): @@ -43,5 +44,26 @@ def test_clip(self): clip(cube, 10, 8) +def test_group_products_string_list(): + products = [ + PreprocessorFile( + attributes={ + 'project': 'A', + 'dataset': 'B', + 'filename': 'A_B.nc'}, + settings={}), + PreprocessorFile( + attributes={ + 'project': 'A', + 'dataset': 'C', + 'filename': 'A_C.nc'}, + settings={}) + ] + grouped_by_string = _group_products(products, 'project') + grouped_by_list = _group_products(products, ['project']) + + assert grouped_by_list == grouped_by_string + + if __name__ == '__main__': unittest.main() diff --git a/tests/unit/preprocessor/test_runner.py b/tests/unit/preprocessor/test_runner.py index f0207b831a..4bc65c5b98 100644 --- a/tests/unit/preprocessor/test_runner.py +++ b/tests/unit/preprocessor/test_runner.py @@ -4,7 +4,8 @@ def test_first_argument_name(): """Check that the input type of all preprocessor functions is valid.""" - valid_itypes = ('file', 'files', 'cube', 'cubes', 'products') + valid_itypes = ('file', 'files', 'cube', 'cubes', 'products', + 'input_products') for step in DEFAULT_ORDER: itype = _get_itype(step) assert itype in valid_itypes, ( diff --git a/tests/unit/test_recipe.py b/tests/unit/test_recipe.py index 151f96a417..b48a731958 100644 --- a/tests/unit/test_recipe.py +++ b/tests/unit/test_recipe.py @@ -1,3 +1,4 @@ +from collections import defaultdict from unittest import mock import iris @@ -350,11 +351,30 @@ def test_multi_model_filename(): PreprocessorFile(cube, 'B', {'timerange': '1989/1990'}), PreprocessorFile(cube, 'C', {'timerange': '1991/1992'}), ] - attributes = _recipe._get_statistic_attributes(products) + attributes = _recipe._get_common_attributes(products) assert 'timerange' in attributes assert attributes['timerange'] == '1989/1992' +def test_update_multiproduct_no_product(): + cube = iris.cube.Cube(np.array([1])) + products = [ + PreprocessorFile(cube, 'A', attributes=None, settings={'step': {}})] + order = ('load', 'save') + preproc_dir = '/preproc_dir' + step = 'multi_model_statistics' + output, settings = _recipe._update_multiproduct( + products, order, preproc_dir, step) + assert output == products + assert settings == {} + + +def test_match_products_no_product(): + variables = [{'var_name': 'var'}] + grouped_products = _recipe._match_products(None, variables) + assert grouped_products == defaultdict(list) + + SCRIPTS_CFG = { 'output_dir': mock.sentinel.output_dir, 'script': mock.sentinel.script,