diff --git a/doc/esmvalcore/preprocessor.rst b/doc/esmvalcore/preprocessor.rst index 2724a91fac..d82875d2b7 100644 --- a/doc/esmvalcore/preprocessor.rst +++ b/doc/esmvalcore/preprocessor.rst @@ -14,6 +14,7 @@ following the default order in which they are applied: * :ref:`Land/Sea/Ice masking` * :ref:`Horizontal regridding` * :ref:`Masking of missing values` +* :ref:`Preprocessing fx variables for spatial statistics` * :ref:`Multi-model statistics` * :ref:`Time operations` * :ref:`Area operations` @@ -486,6 +487,102 @@ combining them into a single mask; here is an example: min_value: -1e20 # small enough not to alter the data # time_window: 10.0 # this will not matter anymore + +.. _preprocessing fx variables for spatial statistics: + +Using mask files for area/volume/zonal statistics +------------------------------------------------- + +Preprocessors like `volume_statistics` may take the optional key `fx_files` to allow +the computation of statistics with fx data; this data is retrieved in the same manner +as regular data files and can be ingested in the statistical computation as-is or it +can be preprocessed: + +Key features: + +- preprocessor steps will be run on the `fx_files: [vars or dicts of vars]` specified in the `area_statistics`, `volume_statistics` or `zonal_statistics` take for instance this example: + +.. code-block:: yaml + + preprocessors: + prep: + custom_order: true + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + volume_statistics: + operator: mean + fx_files: [volcello, ] + + diagnostics: + diag: + variables: + thetao700_Omon: + short_name: thetao + preprocessor: prep + mip: Omon + exp: historical + ensemble: r1i1p1f1 + grid: gn + start_year: 2010 + end_year: 2014 + additional_datasets: + - {dataset: GFDL-CM4} + - {dataset: HadGEM3-GC31-LL} + - {dataset: UKESM1-0-LL} + + +this setup will run the custom-ordered preprocessor steps in `preprocessors/prep` before (and not including) `volume_statistics` for the fx variable `volcello`; + +- it is also possible to request the same type of fx variables but as dictionaries, providing more information towards the retrieval of the correct data; take this snippet for example: + + +.. code-block:: yaml + + preprocessors: + prep: + custom_order: true + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + volume_statistics: + operator: mean + fx_files: [{short_name: volcello, mip: Omon, exp: historical}] + + diagnostics: + diag: + variables: + thetao700_Omon: + short_name: thetao + preprocessor: prep + mip: Omon + exp: historical + ensemble: r1i1p1f1 + grid: gn + start_year: 2010 + end_year: 2014 + additional_datasets: + - {dataset: GFDL-CM4} + - {dataset: HadGEM3-GC31-LL} + - {dataset: UKESM1-0-LL} + + +in this case, by providing the extra CMOR keys, we allow the code to find the `volcello` with mip `Omon` and experiment `historical`. This is particularly useful when the actual fx data is not found in the master variable's (`thetao` in this case) data experiment; this is also useful to force a certain time-dependent data (mip: Omon) for preprocessors that need time operations + +.. note:: + + In the case of fx variables that don't have a time coordinate (e.g. Ofx or Efx), + the preprocessor chain is run but the time preprocessor steps are excluded and + not run to avoid code failure. If you intent to run and use the result of a certain + time-preprocessing step, make sure you request an fx variable that has a time coordinate + e.g. instead of using mip Ofx, use Omon. If data is not available for Omon then + that preprocessor step can not be run and the tool will raise a data not found error. + + Minimum, maximum and interval masking ------------------------------------- diff --git a/esmvalcore/_data_finder.py b/esmvalcore/_data_finder.py index 7a27dee4f6..db6436120b 100644 --- a/esmvalcore/_data_finder.py +++ b/esmvalcore/_data_finder.py @@ -240,7 +240,7 @@ def get_input_filelist(variable, rootpath, drs): return (files, dirnames, filenames) -def get_output_file(variable, preproc_dir): +def get_output_file(variable, preproc_dir, fx_var_alias=None): """Return the full path to the output (preprocessed) file.""" cfg = get_project_config(variable['project']) @@ -249,12 +249,20 @@ def get_output_file(variable, preproc_dir): variable = dict(variable) variable['exp'] = '-'.join(variable['exp']) - outfile = os.path.join( - preproc_dir, - variable['diagnostic'], - variable['variable_group'], - _replace_tags(cfg['output_file'], variable)[0], - ) + if not fx_var_alias: + outfile = os.path.join( + preproc_dir, + variable['diagnostic'], + variable['variable_group'], + _replace_tags(cfg['output_file'], variable)[0], + ) + else: + outfile = os.path.join( + preproc_dir, + variable['diagnostic'], + variable['variable_group'], + _replace_tags(cfg['output_file'], fx_var_alias)[0], + ) if variable['frequency'] != 'fx': outfile += '_{start_year}-{end_year}'.format(**variable) outfile += '.nc' diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index ac8cbcbee9..95c90d543c 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -12,7 +12,8 @@ from . import __version__ from . import _recipe_checks as check -from ._config import TAGS, get_activity, get_institutes, replace_tags +from ._config import (TAGS, get_activity, get_institutes, get_project_config, + replace_tags) from ._data_finder import (get_input_filelist, get_output_file, get_statistic_output_file) from ._provenance import TrackedFile, get_recipe_provenance @@ -22,7 +23,7 @@ from .cmor.table import CMOR_TABLES from .preprocessor import (DEFAULT_ORDER, FINAL_STEPS, INITIAL_STEPS, MULTI_MODEL_FUNCTIONS, PreprocessingTask, - PreprocessorFile) + PreprocessorFile, TIME_PREPROCESSORS) from .preprocessor._derive import get_required from .preprocessor._download import synda_search from .preprocessor._io import DATASET_KEYS, concatenate_callback @@ -128,13 +129,13 @@ def _special_name_to_dataset(variable, special_name): if special_name in ('reference_dataset', 'alternative_dataset'): if special_name not in variable: raise RecipeError( - "Preprocessor {} uses {}, but {} is not defined for " - "variable {} of diagnostic {}".format( - variable['preprocessor'], - special_name, - special_name, - variable['short_name'], - variable['diagnostic'], + "Preprocessor {preproc} uses {name}, but {name} is not " + "defined for variable {short_name} of diagnostic " + "{diagnostic}".format( + preproc=variable['preprocessor'], + name=special_name, + short_name=variable['short_name'], + diagnostic=variable['diagnostic'], )) special_name = variable[special_name] @@ -261,8 +262,7 @@ def _limit_datasets(variables, profile, max_datasets=0): if variable not in limited: limited.append(variable) - logger.info("Only considering %s", - ', '.join(v['alias'] for v in limited)) + logger.info("Only considering %s", ', '.join(v['alias'] for v in limited)) return limited @@ -355,32 +355,36 @@ def _get_default_settings(variable, config_user, derive=False): def _add_fxvar_keys(fx_var_dict, variable): """Add keys specific to fx variable to use get_input_filelist.""" fx_variable = dict(variable) + fx_variable.update(fx_var_dict) # set variable names fx_variable['variable_group'] = fx_var_dict['short_name'] - fx_variable['short_name'] = fx_var_dict['short_name'] - # specificities of project + # add special ensemble for CMIP5 only if fx_variable['project'] == 'CMIP5': - fx_variable['mip'] = 'fx' fx_variable['ensemble'] = 'r0i0p0' - elif fx_variable['project'] == 'CMIP6': - fx_variable['grid'] = variable['grid'] - if 'mip' in fx_var_dict: - fx_variable['mip'] = fx_var_dict['mip'] - elif fx_variable['project'] in ['OBS', 'OBS6', 'obs4mips']: - fx_variable['mip'] = 'fx' + # add missing cmor info _add_cmor_info(fx_variable, override=True) + return fx_variable -def _get_correct_fx_file(variable, fx_varname, config_user): +def _get_correct_fx_file(variable, fx_variable, config_user): """Get fx files (searching all possible mips).""" - # TODO: allow user to specify certain mip if desired + # make it a dict + if not isinstance(fx_variable, dict): + fx_varname = fx_variable + fx_variable = {'short_name': fx_variable} + else: + fx_varname = fx_variable['short_name'] + + # assemble info from master variable var = dict(variable) var_project = variable['project'] + get_project_config(var_project) cmor_table = CMOR_TABLES[var_project] + valid_fx_vars = [] # Get all fx-related mips ('fx' always first, original mip last) fx_mips = ['fx'] @@ -388,26 +392,31 @@ def _get_correct_fx_file(variable, fx_varname, config_user): [key for key in cmor_table.tables if 'fx' in key and key != 'fx']) fx_mips.append(variable['mip']) + # force only the mip declared by user + if 'mip' in fx_variable: + fx_mips = [fx_variable['mip']] + # Search all mips for available variables + # priority goes to user specified mip if available searched_mips = [] + fx_files = [] for fx_mip in fx_mips: - fx_variable = cmor_table.get_variable(fx_mip, fx_varname) - if fx_variable is not None: + fx_cmor_variable = cmor_table.get_variable(fx_mip, fx_varname) + if fx_cmor_variable is not None: + fx_var_dict = dict(fx_variable) searched_mips.append(fx_mip) - fx_var = _add_fxvar_keys( - {'short_name': fx_varname, 'mip': fx_mip}, var) - logger.debug("For CMIP6 fx variable '%s', found table '%s'", - fx_varname, fx_mip) - fx_files = _get_input_files(fx_var, config_user)[0] + fx_var_dict['mip'] = fx_mip + fx_var_dict = _add_fxvar_keys(fx_var_dict, var) + logger.debug("For fx variable '%s', found table '%s'", fx_varname, + fx_mip) + fx_files = _get_input_files(fx_var_dict, config_user)[0] # If files found, return them if fx_files: - logger.debug("Found CMIP6 fx variables '%s':\n%s", - fx_varname, pformat(fx_files)) + valid_fx_vars.append(fx_var_dict) + logger.debug("Found fx variables '%s':\n%s", fx_varname, + pformat(fx_files)) break - else: - # No files found - fx_files = [] # If fx variable was not found in any table, raise exception if not searched_mips: @@ -415,11 +424,17 @@ def _get_correct_fx_file(variable, fx_varname, config_user): f"Requested fx variable '{fx_varname}' not available in " f"any 'fx'-related CMOR table ({fx_mips}) for '{var_project}'") + # flag a warning + if not fx_files: + logger.warning("Missing data for fx variable '%s'", fx_varname) + # allow for empty lists corrected for by NE masks if fx_files: fx_files = fx_files[0] + if valid_fx_vars: + valid_fx_vars = valid_fx_vars[0] - return fx_files + return fx_files, valid_fx_vars def _get_landsea_fraction_fx_dict(variable, config_user): @@ -429,7 +444,9 @@ def _get_landsea_fraction_fx_dict(variable, config_user): if variable['project'] != 'obs4mips': fx_vars.append('sftof') for fx_var in fx_vars: - fx_dict[fx_var] = _get_correct_fx_file(variable, fx_var, config_user) + fx_file, _ = _get_correct_fx_file(variable, fx_var, config_user) + fx_dict[fx_var] = fx_file + return fx_dict @@ -465,8 +482,8 @@ def _update_fx_settings(settings, variable, config_user): if 'mask_landseaice' in settings: logger.debug('Getting fx mask settings now...') settings['mask_landseaice']['fx_files'] = [] - fx_files_dict = { - 'sftgif': _get_correct_fx_file(variable, 'sftgif', config_user)} + fx_file, _ = _get_correct_fx_file(variable, 'sftgif', config_user) + fx_files_dict = {'sftgif': fx_file} if fx_files_dict['sftgif']: settings['mask_landseaice']['fx_files'].append( fx_files_dict['sftgif']) @@ -478,13 +495,27 @@ def _update_fx_settings(settings, variable, config_user): settings['weighting_landsea_fraction']['fx_files'] = fx_dict logger.info(msg, 'land/sea fraction weighting', pformat(fx_dict)) - for step in ('area_statistics', 'volume_statistics'): + for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): + var = dict(variable) + fx_files_dict = {} if settings.get(step, {}).get('fx_files'): - var = dict(variable) var['fx_files'] = settings.get(step, {}).get('fx_files') - fx_files_dict = { - fxvar: _get_correct_fx_file(variable, fxvar, config_user) - for fxvar in var['fx_files']} + for fxvar in var['fx_files']: + fxvar_name = fxvar + if isinstance(fxvar, dict): + fxvar_name = fxvar.get("short_name") + if not fxvar_name: + raise RecipeError( + "Key short_name missing from {}".format(fxvar)) + _, cmor_fx_var = _get_correct_fx_file( + variable, fxvar, config_user) + if cmor_fx_var: + fx_files_dict[fxvar_name] = get_output_file( + var, + config_user['preproc_dir'], + fx_var_alias=cmor_fx_var) + + # finally construct the fx files dictionary settings[step]['fx_files'] = fx_files_dict logger.info(msg, step, pformat(fx_files_dict)) @@ -668,13 +699,21 @@ def get_matching(attributes): def _get_preprocessor_products(variables, profile, order, ancestor_products, - config_user): + config_user, fx_case=False): """Get preprocessor product definitions for a set of datasets.""" products = set() - for variable in variables: - variable['filename'] = get_output_file(variable, - config_user['preproc_dir']) + if not fx_case: + for variable in variables: + variable['filename'] = get_output_file(variable, + config_user['preproc_dir']) + else: + parent_variable = variables[1] + variables = [variables[0]] + for variable in variables: + variable['filename'] = get_output_file(parent_variable, + config_user['preproc_dir'], + fx_var_alias=variable) if ancestor_products: grouped_ancestors = _match_products(ancestor_products, variables) @@ -697,9 +736,9 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, ) _update_extract_shape(settings, config_user) _update_weighting_settings(settings, variable) - _update_fx_settings( - settings=settings, variable=variable, - config_user=config_user) + _update_fx_settings(settings=settings, + variable=variable, + config_user=config_user) _update_target_grid( variable=variable, variables=variables, @@ -708,7 +747,11 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, ) _update_regrid_time(variable, settings) ancestors = grouped_ancestors.get(variable['filename']) - if not ancestors: + fx_files_in_settings = [ + setting.get('fx_files') for setting in settings.values() if + setting.get('fx_files') is not None + ] + if not ancestors or fx_files_in_settings: ancestors = _get_ancestors(variable, config_user) if config_user.get('skip-nonexistent') and not ancestors: logger.info("Skipping: no data found for %s", variable) @@ -732,7 +775,8 @@ def _get_single_preprocessor_task(variables, profile, config_user, name, - ancestor_tasks=None): + ancestor_tasks=None, + fx_case=False): """Create preprocessor tasks for a set of datasets w/ special case fx.""" if ancestor_tasks is None: ancestor_tasks = [] @@ -743,13 +787,23 @@ def _get_single_preprocessor_task(variables, check.check_for_temporal_preprocs(profile) ancestor_products = None - products = _get_preprocessor_products( - variables=variables, - profile=profile, - order=order, - ancestor_products=ancestor_products, - config_user=config_user, - ) + if not fx_case: + products = _get_preprocessor_products( + variables=variables, + profile=profile, + order=order, + ancestor_products=ancestor_products, + config_user=config_user, + ) + else: + products = _get_preprocessor_products( + variables=variables, + profile=profile, + order=order, + ancestor_products=ancestor_products, + config_user=config_user, + fx_case=True + ) if not products: raise RecipeError( @@ -848,6 +902,16 @@ def append(group_prefix, var): return derive_input +def _remove_time_preproc(fxprofile): + """Remove all time preprocessors from the fx profile.""" + fxprofile = deepcopy(fxprofile) + for key in fxprofile: + if key in TIME_PREPROCESSORS: + fxprofile[key] = False + + return fxprofile + + def _get_preprocessor_task(variables, profiles, config_user, task_name): """Create preprocessor task(s) for a set of datasets.""" # First set up the preprocessor profile @@ -866,6 +930,8 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): _add_cmor_info(variable) # Create preprocessor task(s) derive_tasks = [] + fx_preproc_tasks = [] + # set up tasks if variable.get('derive'): # Create tasks to prepare the input data for the derive step derive_profile, profile = _split_derive_profile(profile) @@ -884,6 +950,49 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): ) derive_tasks.append(task) + # special case: fx variable pre-processing + variable_pairs = [] + for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): + if profile.get(step, {}).get('fx_files'): + # conserve profile + fx_profile = deepcopy(profile) + fx_vars = profile.get(step, {}).get('fx_files') + + # Create tasks to prepare the input data for the fx var + order = _extract_preprocessor_order(fx_profile) + for var in variables: + fx_variables = [ + _get_correct_fx_file(var, fx_var, config_user)[1] + for fx_var in fx_vars + ] + for fx_variable in fx_variables: + # list may be (intentionally) empty - catch it here + if not fx_variable: + raise RecipeError( + f"One or more of {step} fx data " + f"for {var['short_name']} are missing. " + f"Task can not be performed since there is " + f"no fx data found.") + before, _ = _split_settings(fx_profile, step, order) + # remove time preprocessors for any fx/Ofx/Efx/etc + # that dont have time coords + if fx_variable['frequency'] == 'fx': + before = _remove_time_preproc(before) + fx_name = task_name.split( + TASKSEP)[0] + TASKSEP + 'fx_area-volume_stats_' + \ + fx_variable['variable_group'] + task = _get_single_preprocessor_task( + [fx_variable, var], + before, + config_user, + name=fx_name, + fx_case=True + ) + fx_preproc_tasks.append(task) + variable_pairs.append([var, fx_variable]) + + derive_tasks.extend(fx_preproc_tasks) + # Create (final) preprocessor task task = _get_single_preprocessor_task( variables, @@ -896,6 +1005,25 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): return task +def _check_duplication(task): + """Check and remove duplicate ancestry tasks.""" + ancestors_filename_dict = OrderedDict() + filenames = [] + if isinstance(task, PreprocessingTask): + for ancestor_task in task.ancestors: + for anc_product in ancestor_task.products: + ancestors_filename_dict[anc_product.filename] = ancestor_task + filenames.append(anc_product.filename) + set_ancestors = list( + {ancestors_filename_dict[filename] for filename in filenames}) + task.ancestors = [ + ancestor for ancestor in task.ancestors + if ancestor in set_ancestors + ] + + return task + + class Recipe: """Recipe object.""" @@ -903,7 +1031,6 @@ class Recipe: 'project', 'activity', 'dataset', 'exp', 'ensemble', 'version' ) """List of keys to be used to compose the alias, ordered by priority.""" - def __init__(self, raw_recipe, config_user, @@ -984,10 +1111,10 @@ def _initialize_datasets(raw_datasets): @staticmethod def _expand_ensemble(variables): - """ - Expand ensemble members to multiple datasets + """Expand ensemble members to multiple datasets. + + Expansion only support ensembles defined as strings, not lists. - Expansion only support ensembles defined as strings, not lists """ expanded = [] regex = re.compile(r'\(\d+:\d+\)') @@ -1000,7 +1127,7 @@ def _expand_ensemble(variables): if not match: expanded.append(variable) continue - start, end = match.group(0)[1: -1].split(':') + start, end = match.group(0)[1:-1].split(':') for i in range(int(start), int(end) + 1): expand = deepcopy(variable) expand['ensemble'] = regex.sub(str(i), ensemble, 1) @@ -1077,8 +1204,7 @@ def _initialize_preprocessor_output(self, diagnostic_name, raw_variables, return preprocessor_output def _set_alias(self, preprocessor_output): - """ - Add unique alias for datasets. + """Add unique alias for datasets. Generates a unique alias for each dataset that will be shared by all variables. Tries to make it as small as possible to make it useful for @@ -1095,7 +1221,7 @@ def _set_alias(self, preprocessor_output): Function will not modify alias if it is manually added to the recipe but it will use the dataset info to compute the others - Examples: + Examples -------- - {project: CMIP5, model: EC-Earth, ensemble: r1i1p1} - {project: CMIP6, model: EC-Earth, ensemble: r1i1p1f1} @@ -1119,7 +1245,7 @@ def _set_alias(self, preprocessor_output): - {project: CMIP5, model: EC-Earth, experiment: historical} will generate alias 'EC-Earth' - Parameters: + Parameters ---------- preprocessor_output : dict preprocessor output dictionary @@ -1266,6 +1392,11 @@ def initialize_tasks(self): config_user=self._cfg, task_name=task_name, ) + # remove possible duplicate ancestor tasks that + # preprocess the same + # fx var file needed by different var["activity"] but with + # the same output fx var file + _check_duplication(task) for task0 in task.flatten(): task0.priority = priority tasks.add(task) diff --git a/esmvalcore/preprocessor/_area.py b/esmvalcore/preprocessor/_area.py index dfb39a74ab..c5599688e6 100644 --- a/esmvalcore/preprocessor/_area.py +++ b/esmvalcore/preprocessor/_area.py @@ -221,6 +221,9 @@ def area_statistics(cube, operator, fx_files=None): | `max` | Maximum value | +------------+--------------------------------------------------+ + If fx_files is provided, the variable's preprocessor chain will be applied + to each of the requested fx variables, up to and not including this step. + Parameters ---------- cube: iris.cube.Cube @@ -228,8 +231,11 @@ def area_statistics(cube, operator, fx_files=None): operator: str The operation, options: mean, median, min, max, std_dev, sum, variance - fx_files: dict - dictionary of field:filename for the fx_files + fx_files: list + list of field:str short_name for the fx variables requested or + list of field:dict for the fx variables requested, including + but not limited to: keys: short_name, mip, experiment etc (at least + short_name required if dict) Returns ------- diff --git a/esmvalcore/preprocessor/_volume.py b/esmvalcore/preprocessor/_volume.py index b0b0db138d..112474e2df 100644 --- a/esmvalcore/preprocessor/_volume.py +++ b/esmvalcore/preprocessor/_volume.py @@ -176,7 +176,9 @@ def volume_statistics( The volume average is weighted according to the cell volume. Cell volume is calculated from iris's cartography tool multiplied by the cell - thickness. + thickness. If fx_files is provided, the variable's preprocessor chain will + be applied to each of the requested fx variables, up to and not including + this step. Parameters ---------- @@ -184,8 +186,11 @@ def volume_statistics( Input cube. operator: str The operation to apply to the cube, options are: 'mean'. - fx_files: dict - dictionary of field:filename for the fx_files + fx_files: list + list of field:str short_name for the fx variables requested or + list of field:dict for the fx variables requested, including + but not limited to: keys: short_name, mip, experiment etc (at least + short_name required if dict) Returns ------- diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index c9efc7a467..5462d1396f 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -1906,6 +1906,362 @@ def test_fx_vars_volcello_in_ofx_cmip6(tmp_path, patched_datafinder, assert '_Omon_' not in fx_files['volcello'] +def test_fx_dicts_volcello_in_ofx_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + volume_statistics: + operator: mean + fx_files: [{'short_name': 'volcello', 'mip': 'Oyr', + 'exp': 'piControl'}] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.products) == 1 + product = task.products.pop() + + # Check volume_statistics + assert 'volume_statistics' in product.settings + settings = product.settings['volume_statistics'] + assert len(settings) == 2 + assert settings['operator'] == 'mean' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert len(fx_files) == 1 + assert '_Oyr_' in fx_files['volcello'] + assert '_piControl_' in fx_files['volcello'] + assert '_Omon_' not in fx_files['volcello'] + + +def test_fx_vars_list_no_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + regrid: + target_grid: 1x1 + scheme: linear + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + convert_units: + units: K + area_statistics: + operator: mean + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 0 + assert len(task.products) == 1 + product = task.products.pop() + assert product.attributes['short_name'] == 'tos' + assert product.files + assert 'area_statistics' in product.settings + settings = product.settings['area_statistics'] + assert len(settings) == 1 + assert settings['operator'] == 'mean' + assert 'fx_files' not in settings + + +def test_fx_vars_list_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + regrid: + target_grid: 1x1 + scheme: linear + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + convert_units: + units: K + volume_statistics: + operator: mean + fx_files: ['areacello', 'volcello'] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 2 + assert len(task.ancestors[0].ancestors) == 0 + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_areacello' in [ + t.name for t in task.ancestors + ] + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_volcello' in [ + t.name for t in task.ancestors + ] + for product in task.products: + assert 'volume_statistics' in product.settings + assert product.attributes['short_name'] == 'tos' + for ancestor_product in task.ancestors[0].products: + assert ancestor_product.attributes['short_name'] == 'areacello' + assert 'volume_statistics' not in ancestor_product.settings + assert 'convert_units' not in ancestor_product.settings + assert 'regrid' in ancestor_product.settings + assert 'extract_volume' in ancestor_product.settings + for ancestor_product in task.ancestors[1].products: + assert ancestor_product.attributes['short_name'] == 'volcello' + assert 'volume_statistics' not in ancestor_product.settings + assert 'convert_units' not in ancestor_product.settings + assert 'regrid' in ancestor_product.settings + assert 'extract_volume' in ancestor_product.settings + assert len(task.products) == 1 + product = task.products.pop() + assert product.attributes['short_name'] == 'tos' + assert product.files + assert 'volume_statistics' in product.settings + settings = product.settings['volume_statistics'] + assert len(settings) == 2 + assert settings['operator'] == 'mean' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert 'preproc' in fx_files['areacello'] + assert 'CMIP6_CanESM5_Ofx_historical_r1i1p1f1_areacello_2000-2005.nc' in \ + fx_files['areacello'] + + +def test_fx_vars_list_preproc_cmip6_fail(tmp_path, patched_failing_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + regrid: + target_grid: 1x1 + scheme: linear + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + convert_units: + units: K + volume_statistics: + operator: mean + fx_files: ['areacello', 'volcello'] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + with pytest.raises(RecipeError) as rec_err: + get_recipe(tmp_path, content, config_user) + msg = ('One or more of volume_statistics fx data for tos are missing. ' + 'Task can not be performed since there is no fx data found.') + assert rec_err == msg + + +def test_fx_vars_dicts_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + custom_order: true + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + volume_statistics: + operator: mean + fx_files: [{'short_name': 'areacello', 'mip': 'Ofx', + 'exp': 'piControl'}, + {'short_name': 'volcello', 'mip': 'Omon', + 'exp': 'historical'}] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 2 + assert len(task.ancestors[0].ancestors) == 0 + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_areacello' in [ + t.name for t in task.ancestors + ] + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_volcello' in [ + t.name for t in task.ancestors + ] + for product in task.products: + assert 'volume_statistics' in product.settings + assert product.attributes['short_name'] == 'tos' + for ancestor_product in task.ancestors[0].products: + assert ancestor_product.attributes['short_name'] == 'areacello' + assert ancestor_product.attributes['mip'] == 'Ofx' + assert ancestor_product.attributes['exp'] == 'piControl' + assert 'annual_statistics' not in ancestor_product.settings + assert 'volume_statistics' not in ancestor_product.settings + for ancestor_product in task.ancestors[1].products: + assert ancestor_product.attributes['short_name'] == 'volcello' + assert ancestor_product.attributes['mip'] == 'Omon' + assert ancestor_product.attributes['exp'] == 'historical' + assert 'annual_statistics' in ancestor_product.settings + assert 'volume_statistics' not in ancestor_product.settings + assert len(task.products) == 1 + product = task.products.pop() + assert product.attributes['short_name'] == 'tos' + assert product.files + assert 'volume_statistics' in product.settings + + +def test_fx_vars_dicts_activity_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + custom_order: true + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + volume_statistics: + operator: mean + fx_files: [{short_name: areacello, + mip: Ofx, exp: piControl, + activity: CMIP}] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f2 + grid: gn + additional_datasets: + - {dataset: UKESM1-0-LL, exp: ssp585} + - {dataset: UKESM1-0-LL, exp: ssp119} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 1 + assert len(task.ancestors[0].ancestors) == 0 + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_areacello' in [ + t.name for t in task.ancestors + ] + product_files = [ + anc_product.filename for anc_product + in task.ancestors[0].products + ] + anc_products = task.ancestors[0].products + assert len(anc_products) == 1 + assert len(product_files) == 1 + assert os.path.basename(product_files[0]) == \ + 'CMIP6_UKESM1-0-LL_Ofx_piControl_r1i1p1f2_areacello_2000-2005.nc' + for product in task.products: + assert 'volume_statistics' in product.settings + assert product.attributes['short_name'] == 'tos' + ancestor_product = list(anc_products)[0] + assert ancestor_product.attributes['short_name'] == 'areacello' + assert ancestor_product.attributes['mip'] == 'Ofx' + assert ancestor_product.attributes['exp'] == 'piControl' + assert ancestor_product.attributes['activity'] == 'CMIP' + assert 'annual_statistics' not in ancestor_product.settings + assert 'volume_statistics' not in ancestor_product.settings + + def test_fx_vars_volcello_in_omon_cmip6(tmp_path, patched_failing_datafinder, config_user): content = dedent(""" @@ -2108,33 +2464,103 @@ def test_invalid_fx_var_cmip6(tmp_path, patched_datafinder, config_user): assert msg in str(rec_err_exp.value) -def test_fx_var_invalid_project(tmp_path, patched_datafinder, config_user): +def test_fx_vars_duplicate_files(tmp_path, patched_datafinder, + config_user): content = dedent(""" preprocessors: preproc: - area_statistics: - operator: mean - fx_files: ['areacella'] + custom_order: True + annual_statistics: + operator: mean + extract_region: + start_longitude: -80. + end_longitude: 30. + start_latitude: -80. + end_latitude: 80. + area_statistics: + operator: mean + fx_files: [{short_name: areacello, mip: Ofx, + exp: piControl, activity: CMIP}] + regrid_time: + frequency: yr diagnostics: - diagnostic_name: + diag_437: variables: - tas: + tos: preprocessor: preproc - project: EMAC - mip: Amon - exp: historical + project: CMIP6 + mip: Omon start_year: 2000 end_year: 2005 - ensemble: r1i1p1f1 + ensemble: r1i1p1f2 grid: gn additional_datasets: - - {dataset: CanESM5} + - {dataset: UKESM1-0-LL, exp: ssp585} + - {dataset: UKESM1-0-LL, exp: ssp119} + scripts: null + diag_439: + variables: + sst: + short_name: tos + preprocessor: preproc + project: CMIP6 + mip: Omon + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f2 + grid: gn + additional_datasets: + - {dataset: UKESM1-0-LL, exp: ssp585} + sss: + short_name: tos + preprocessor: preproc + project: CMIP6 + mip: Omon + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f2 + grid: gn + additional_datasets: + - {dataset: UKESM1-0-LL, exp: ssp585} scripts: null """) - msg = ( - "Unable to load CMOR table (project) 'EMAC' for variable 'areacella' " - "with mip 'Amon'") - with pytest.raises(RecipeError) as rec_err_exp: - get_recipe(tmp_path, content, config_user) - assert str(rec_err_exp.value) == msg + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 3 + all_task_names = [ + 'diag_439/sst', 'diag_437/tos', 'diag_439/sss' + ] + task_name = 'diag_437' + TASKSEP + 'tos' + task = [ + elem for elem in recipe.tasks if elem.name == task_name + ][0] + assert task.name in all_task_names + assert len(task.ancestors) == 1 + assert len(task.ancestors[0].ancestors) == 0 + anc_name = 'diag_437' + TASKSEP + 'fx_area-volume_stats_areacello' + assert anc_name in [ + t.name for t in task.ancestors + ] + for product in task.products: + assert 'regrid_time' in product.settings + assert product.attributes['short_name'] == 'tos' + for anc_product in task.ancestors[0].products: + assert anc_product.attributes['short_name'] == 'areacello' + + # identify tasks + task_name_a = 'diag_439' + TASKSEP + 'sss' + task_a = [ + elem for elem in recipe.tasks if elem.name == task_name_a + ] + assert task_a + task_a = task_a[0] + task_name_b = 'diag_439' + TASKSEP + 'sst' + task_b = [ + elem for elem in recipe.tasks if elem.name == task_name_b + ] + assert task_b + task_b = task_b[0] + assert len(task_a.ancestors) == 1 + assert len(task_b.ancestors) == 1