From 4b21a4c9014318772e75e410fab2949410a1cb36 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 21 Feb 2020 15:46:49 +0000 Subject: [PATCH 01/34] functionality for fx vars data to be preprocessed part of area and volume statistiics --- esmvalcore/_recipe.py | 267 +++++++++++++++++++++++++++++++----------- 1 file changed, 199 insertions(+), 68 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 4c52008d6d..b093c58af3 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -12,7 +12,8 @@ from . import __version__ from . import _recipe_checks as check -from ._config import TAGS, get_activity, get_institutes, replace_tags +from ._config import (TAGS, get_activity, get_institutes, get_project_config, + replace_tags) from ._data_finder import (get_input_filelist, get_output_file, get_statistic_output_file) from ._provenance import TrackedFile, get_recipe_provenance @@ -22,7 +23,7 @@ from .cmor.table import CMOR_TABLES from .preprocessor import (DEFAULT_ORDER, FINAL_STEPS, INITIAL_STEPS, MULTI_MODEL_FUNCTIONS, PreprocessingTask, - PreprocessorFile) + PreprocessorFile, TIME_PREPROCESSORS) from .preprocessor._derive import get_required from .preprocessor._download import synda_search from .preprocessor._io import DATASET_KEYS, concatenate_callback @@ -128,13 +129,13 @@ def _special_name_to_dataset(variable, special_name): if special_name in ('reference_dataset', 'alternative_dataset'): if special_name not in variable: raise RecipeError( - "Preprocessor {} uses {}, but {} is not defined for " - "variable {} of diagnostic {}".format( - variable['preprocessor'], - special_name, - special_name, - variable['short_name'], - variable['diagnostic'], + "Preprocessor {preproc} uses {name}, but {name} is not " + "defined for variable {short_name} of diagnostic " + "{diagnostic}".format( + preproc=variable['preprocessor'], + name=special_name, + short_name=variable['short_name'], + diagnostic=variable['diagnostic'], )) special_name = variable[special_name] @@ -261,8 +262,7 @@ def _limit_datasets(variables, profile, max_datasets=0): if variable not in limited: limited.append(variable) - logger.info("Only considering %s", - ', '.join(v['alias'] for v in limited)) + logger.info("Only considering %s", ', '.join(v['alias'] for v in limited)) return limited @@ -352,32 +352,36 @@ def _get_default_settings(variable, config_user, derive=False): def _add_fxvar_keys(fx_var_dict, variable): """Add keys specific to fx variable to use get_input_filelist.""" fx_variable = dict(variable) + fx_variable.update(fx_var_dict) # set variable names fx_variable['variable_group'] = fx_var_dict['short_name'] - fx_variable['short_name'] = fx_var_dict['short_name'] - # specificities of project + # add special ensemble for CMIP5 only if fx_variable['project'] == 'CMIP5': - fx_variable['mip'] = 'fx' fx_variable['ensemble'] = 'r0i0p0' - elif fx_variable['project'] == 'CMIP6': - fx_variable['grid'] = variable['grid'] - if 'mip' in fx_var_dict: - fx_variable['mip'] = fx_var_dict['mip'] - elif fx_variable['project'] in ['OBS', 'OBS6', 'obs4mips']: - fx_variable['mip'] = 'fx' + # add missing cmor info _add_cmor_info(fx_variable, override=True) + return fx_variable -def _get_correct_fx_file(variable, fx_varname, config_user): +def _get_correct_fx_file(variable, fx_variable, config_user): """Get fx files (searching all possible mips).""" - # TODO: allow user to specify certain mip if desired + # make it a dict + if not isinstance(fx_variable, dict): + fx_varname = fx_variable + fx_variable = {'short_name': fx_variable} + else: + fx_varname = fx_variable['short_name'] + + # assemble info from master variable var = dict(variable) var_project = variable['project'] + get_project_config(var_project) cmor_table = CMOR_TABLES[var_project] + valid_fx_vars = [] # Get all fx-related mips ('fx' always first, original mip last) fx_mips = ['fx'] @@ -385,26 +389,31 @@ def _get_correct_fx_file(variable, fx_varname, config_user): [key for key in cmor_table.tables if 'fx' in key and key != 'fx']) fx_mips.append(variable['mip']) + # force only the mip declared by user + if 'mip' in fx_variable: + fx_mips = [fx_variable['mip']] + # Search all mips for available variables + # priority goes to user specified mip if available searched_mips = [] + fx_files = [] for fx_mip in fx_mips: - fx_variable = cmor_table.get_variable(fx_mip, fx_varname) - if fx_variable is not None: + fx_cmor_variable = cmor_table.get_variable(fx_mip, fx_varname) + if fx_cmor_variable is not None: + fx_var_dict = dict(fx_variable) searched_mips.append(fx_mip) - fx_var = _add_fxvar_keys( - {'short_name': fx_varname, 'mip': fx_mip}, var) - logger.debug("For CMIP6 fx variable '%s', found table '%s'", - fx_varname, fx_mip) - fx_files = _get_input_files(fx_var, config_user)[0] + fx_var_dict['mip'] = fx_mip + fx_var_dict = _add_fxvar_keys(fx_var_dict, var) + logger.debug("For fx variable '%s', found table '%s'", fx_varname, + fx_mip) + fx_files = _get_input_files(fx_var_dict, config_user)[0] # If files found, return them if fx_files: - logger.debug("Found CMIP6 fx variables '%s':\n%s", - fx_varname, pformat(fx_files)) + valid_fx_vars.append(fx_var_dict) + logger.debug("Found fx variables '%s':\n%s", fx_varname, + pformat(fx_files)) break - else: - # No files found - fx_files = [] # If fx variable was not found in any table, raise exception if not searched_mips: @@ -412,11 +421,17 @@ def _get_correct_fx_file(variable, fx_varname, config_user): f"Requested fx variable '{fx_varname}' not available in " f"any 'fx'-related CMOR table ({fx_mips}) for '{var_project}'") + # flag a warning + if not fx_files: + logger.warning("Missing data for fx variable '%s'", fx_varname) + # allow for empty lists corrected for by NE masks if fx_files: fx_files = fx_files[0] + if valid_fx_vars: + valid_fx_vars = valid_fx_vars[0] - return fx_files + return fx_files, valid_fx_vars def _get_landsea_fraction_fx_dict(variable, config_user): @@ -426,7 +441,9 @@ def _get_landsea_fraction_fx_dict(variable, config_user): if variable['project'] != 'obs4mips': fx_vars.append('sftof') for fx_var in fx_vars: - fx_dict[fx_var] = _get_correct_fx_file(variable, fx_var, config_user) + fx_file, _ = _get_correct_fx_file(variable, fx_var, config_user) + fx_dict[fx_var] = fx_file + return fx_dict @@ -462,8 +479,8 @@ def _update_fx_settings(settings, variable, config_user): if 'mask_landseaice' in settings: logger.debug('Getting fx mask settings now...') settings['mask_landseaice']['fx_files'] = [] - fx_files_dict = { - 'sftgif': _get_correct_fx_file(variable, 'sftgif', config_user)} + fx_file, _ = _get_correct_fx_file(variable, 'sftgif', config_user) + fx_files_dict = {'sftgif': fx_file} if fx_files_dict['sftgif']: settings['mask_landseaice']['fx_files'].append( fx_files_dict['sftgif']) @@ -475,13 +492,27 @@ def _update_fx_settings(settings, variable, config_user): settings['weighting_landsea_fraction']['fx_files'] = fx_dict logger.info(msg, 'land/sea fraction weighting', pformat(fx_dict)) - for step in ('area_statistics', 'volume_statistics'): + for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): + var = dict(variable) + fx_files_dict = {} if settings.get(step, {}).get('fx_files'): - var = dict(variable) var['fx_files'] = settings.get(step, {}).get('fx_files') - fx_files_dict = { - fxvar: _get_correct_fx_file(variable, fxvar, config_user) - for fxvar in var['fx_files']} + for fxvar in var['fx_files']: + fxvar_name = fxvar + if isinstance(fxvar, dict): + fxvar_name = fxvar.get("short_name") + if not fxvar_name: + raise RecipeError( + "Key short_name missing from {}".format(fxvar)) + _, cmor_fx_var = _get_correct_fx_file( + variable, fxvar, config_user) + if cmor_fx_var: + fx_files_dict[fxvar_name] = get_output_file( + var, + config_user['preproc_dir'], + fx_var_alias=cmor_fx_var) + + # finally construct the fx files dictionary settings[step]['fx_files'] = fx_files_dict logger.info(msg, step, pformat(fx_files_dict)) @@ -665,13 +696,21 @@ def get_matching(attributes): def _get_preprocessor_products(variables, profile, order, ancestor_products, - config_user): + config_user, fx_case=False): """Get preprocessor product definitions for a set of datasets.""" products = set() - for variable in variables: - variable['filename'] = get_output_file(variable, - config_user['preproc_dir']) + if not fx_case: + for variable in variables: + variable['filename'] = get_output_file(variable, + config_user['preproc_dir']) + else: + parent_variable = variables[1] + variables = [variables[0]] + for variable in variables: + variable['filename'] = get_output_file(parent_variable, + config_user['preproc_dir'], + fx_var_alias=variable) if ancestor_products: grouped_ancestors = _match_products(ancestor_products, variables) @@ -694,9 +733,9 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, ) _update_extract_shape(settings, config_user) _update_weighting_settings(settings, variable) - _update_fx_settings( - settings=settings, variable=variable, - config_user=config_user) + _update_fx_settings(settings=settings, + variable=variable, + config_user=config_user) _update_target_grid( variable=variable, variables=variables, @@ -705,7 +744,11 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, ) _update_regrid_time(variable, settings) ancestors = grouped_ancestors.get(variable['filename']) - if not ancestors: + fx_files_in_settings = [ + setting.get('fx_files') for setting in settings.values() if + setting.get('fx_files') is not None + ] + if not ancestors or fx_files_in_settings: ancestors = _get_ancestors(variable, config_user) if config_user.get('skip-nonexistent') and not ancestors: logger.info("Skipping: no data found for %s", variable) @@ -729,7 +772,8 @@ def _get_single_preprocessor_task(variables, profile, config_user, name, - ancestor_tasks=None): + ancestor_tasks=None, + fx_case=False): """Create preprocessor tasks for a set of datasets w/ special case fx.""" if ancestor_tasks is None: ancestor_tasks = [] @@ -740,13 +784,23 @@ def _get_single_preprocessor_task(variables, check.check_for_temporal_preprocs(profile) ancestor_products = None - products = _get_preprocessor_products( - variables=variables, - profile=profile, - order=order, - ancestor_products=ancestor_products, - config_user=config_user, - ) + if not fx_case: + products = _get_preprocessor_products( + variables=variables, + profile=profile, + order=order, + ancestor_products=ancestor_products, + config_user=config_user, + ) + else: + products = _get_preprocessor_products( + variables=variables, + profile=profile, + order=order, + ancestor_products=ancestor_products, + config_user=config_user, + fx_case=True + ) if not products: raise RecipeError( @@ -845,6 +899,16 @@ def append(group_prefix, var): return derive_input +def _remove_time_preproc(fxprofile): + """Remove all time preprocessors from the fx profile.""" + fxprofile = deepcopy(fxprofile) + for key in fxprofile: + if key in TIME_PREPROCESSORS: + fxprofile[key] = False + + return fxprofile + + def _get_preprocessor_task(variables, profiles, config_user, task_name): """Create preprocessor task(s) for a set of datasets.""" # First set up the preprocessor profile @@ -863,6 +927,8 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): _add_cmor_info(variable) # Create preprocessor task(s) derive_tasks = [] + fx_preproc_tasks = [] + # set up tasks if variable.get('derive'): # Create tasks to prepare the input data for the derive step derive_profile, profile = _split_derive_profile(profile) @@ -881,6 +947,49 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): ) derive_tasks.append(task) + # special case: fx variable pre-processing + variable_pairs = [] + for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): + if profile.get(step, {}).get('fx_files'): + # conserve profile + fx_profile = deepcopy(profile) + fx_vars = profile.get(step, {}).get('fx_files') + + # Create tasks to prepare the input data for the fx var + order = _extract_preprocessor_order(fx_profile) + for var in variables: + fx_variables = [ + _get_correct_fx_file(var, fx_var, config_user)[1] + for fx_var in fx_vars + ] + for fx_variable in fx_variables: + # list may be (intentionally) empty - catch it here + if not fx_variable: + raise RecipeError( + f"One or more of {step} fx data " + f"for {var['short_name']} are missing. " + f"Task can not be performed since there is " + f"no fx data found.") + before, _ = _split_settings(fx_profile, step, order) + # remove time preprocessors for any fx/Ofx/Efx/etc + # that dont have time coords + if fx_variable['frequency'] == 'fx': + before = _remove_time_preproc(before) + fx_name = task_name.split( + TASKSEP)[0] + TASKSEP + 'fx_area-volume_stats_' + \ + fx_variable['variable_group'] + task = _get_single_preprocessor_task( + [fx_variable, var], + before, + config_user, + name=fx_name, + fx_case=True + ) + fx_preproc_tasks.append(task) + variable_pairs.append([var, fx_variable]) + + derive_tasks.extend(fx_preproc_tasks) + # Create (final) preprocessor task task = _get_single_preprocessor_task( variables, @@ -893,6 +1002,25 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): return task +def _check_duplication(task): + """Check and remove duplicate ancestry tasks.""" + ancestors_filename_dict = OrderedDict() + filenames = [] + if isinstance(task, PreprocessingTask): + for ancestor_task in task.ancestors: + for anc_product in ancestor_task.products: + ancestors_filename_dict[anc_product.filename] = ancestor_task + filenames.append(anc_product.filename) + set_ancestors = list( + {ancestors_filename_dict[filename] for filename in filenames}) + task.ancestors = [ + ancestor for ancestor in task.ancestors + if ancestor in set_ancestors + ] + + return task + + class Recipe: """Recipe object.""" @@ -900,7 +1028,6 @@ class Recipe: 'project', 'activity', 'dataset', 'exp', 'ensemble', 'version' ) """List of keys to be used to compose the alias, ordered by priority.""" - def __init__(self, raw_recipe, config_user, @@ -981,10 +1108,10 @@ def _initialize_datasets(raw_datasets): @staticmethod def _expand_ensemble(variables): - """ - Expand ensemble members to multiple datasets + """Expand ensemble members to multiple datasets. + + Expansion only support ensembles defined as strings, not lists. - Expansion only support ensembles defined as strings, not lists """ expanded = [] regex = re.compile(r'\(\d+:\d+\)') @@ -997,7 +1124,7 @@ def _expand_ensemble(variables): if not match: expanded.append(variable) continue - start, end = match.group(0)[1: -1].split(':') + start, end = match.group(0)[1:-1].split(':') for i in range(int(start), int(end) + 1): expand = deepcopy(variable) expand['ensemble'] = regex.sub(str(i), ensemble, 1) @@ -1074,8 +1201,7 @@ def _initialize_preprocessor_output(self, diagnostic_name, raw_variables, return preprocessor_output def _set_alias(self, preprocessor_output): - """ - Add unique alias for datasets. + """Add unique alias for datasets. Generates a unique alias for each dataset that will be shared by all variables. Tries to make it as small as possible to make it useful for @@ -1092,7 +1218,7 @@ def _set_alias(self, preprocessor_output): Function will not modify alias if it is manually added to the recipe but it will use the dataset info to compute the others - Examples: + Examples -------- - {project: CMIP5, model: EC-Earth, ensemble: r1i1p1} - {project: CMIP6, model: EC-Earth, ensemble: r1i1p1f1} @@ -1116,7 +1242,7 @@ def _set_alias(self, preprocessor_output): - {project: CMIP5, model: EC-Earth, experiment: historical} will generate alias 'EC-Earth' - Parameters: + Parameters ---------- preprocessor_output : dict preprocessor output dictionary @@ -1263,6 +1389,11 @@ def initialize_tasks(self): config_user=self._cfg, task_name=task_name, ) + # remove possible duplicate ancestor tasks that + # preprocess the same + # fx var file needed by different var["activity"] but with + # the same output fx var file + _check_duplication(task) for task0 in task.flatten(): task0.priority = priority tasks.add(task) From 8159f597b4544eaceec11ba118a3d045d2199673 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 21 Feb 2020 15:47:12 +0000 Subject: [PATCH 02/34] slight change for special fx vars case --- esmvalcore/_data_finder.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/esmvalcore/_data_finder.py b/esmvalcore/_data_finder.py index 7a27dee4f6..db6436120b 100644 --- a/esmvalcore/_data_finder.py +++ b/esmvalcore/_data_finder.py @@ -240,7 +240,7 @@ def get_input_filelist(variable, rootpath, drs): return (files, dirnames, filenames) -def get_output_file(variable, preproc_dir): +def get_output_file(variable, preproc_dir, fx_var_alias=None): """Return the full path to the output (preprocessed) file.""" cfg = get_project_config(variable['project']) @@ -249,12 +249,20 @@ def get_output_file(variable, preproc_dir): variable = dict(variable) variable['exp'] = '-'.join(variable['exp']) - outfile = os.path.join( - preproc_dir, - variable['diagnostic'], - variable['variable_group'], - _replace_tags(cfg['output_file'], variable)[0], - ) + if not fx_var_alias: + outfile = os.path.join( + preproc_dir, + variable['diagnostic'], + variable['variable_group'], + _replace_tags(cfg['output_file'], variable)[0], + ) + else: + outfile = os.path.join( + preproc_dir, + variable['diagnostic'], + variable['variable_group'], + _replace_tags(cfg['output_file'], fx_var_alias)[0], + ) if variable['frequency'] != 'fx': outfile += '_{start_year}-{end_year}'.format(**variable) outfile += '.nc' From d0cb3f9a56c754dae0a85122fd05a513af8a39e5 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 21 Feb 2020 15:47:48 +0000 Subject: [PATCH 03/34] comprehensive testing for fx vars preprocessing hadling for area and volume statistics --- tests/integration/test_recipe.py | 460 +++++++++++++++++++++++++++++-- 1 file changed, 443 insertions(+), 17 deletions(-) diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index fbc40fc787..4952502ba6 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -1895,6 +1895,362 @@ def test_fx_vars_volcello_in_ofx_cmip6(tmp_path, patched_datafinder, assert '_Omon_' not in fx_files['volcello'] +def test_fx_dicts_volcello_in_ofx_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + volume_statistics: + operator: mean + fx_files: [{'short_name': 'volcello', 'mip': 'Oyr', + 'exp': 'piControl'}] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.products) == 1 + product = task.products.pop() + + # Check volume_statistics + assert 'volume_statistics' in product.settings + settings = product.settings['volume_statistics'] + assert len(settings) == 2 + assert settings['operator'] == 'mean' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert len(fx_files) == 1 + assert '_Oyr_' in fx_files['volcello'] + assert '_piControl_' in fx_files['volcello'] + assert '_Omon_' not in fx_files['volcello'] + + +def test_fx_vars_list_no_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + regrid: + target_grid: 1x1 + scheme: linear + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + convert_units: + units: K + area_statistics: + operator: mean + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 0 + assert len(task.products) == 1 + product = task.products.pop() + assert product.attributes['short_name'] == 'tos' + assert product.files + assert 'area_statistics' in product.settings + settings = product.settings['area_statistics'] + assert len(settings) == 1 + assert settings['operator'] == 'mean' + assert 'fx_files' not in settings + + +def test_fx_vars_list_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + regrid: + target_grid: 1x1 + scheme: linear + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + convert_units: + units: K + volume_statistics: + operator: mean + fx_files: ['areacello', 'volcello'] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 2 + assert len(task.ancestors[0].ancestors) == 0 + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_areacello' in [ + t.name for t in task.ancestors + ] + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_volcello' in [ + t.name for t in task.ancestors + ] + for product in task.products: + assert 'volume_statistics' in product.settings + assert product.attributes['short_name'] == 'tos' + for ancestor_product in task.ancestors[0].products: + assert ancestor_product.attributes['short_name'] == 'areacello' + assert 'volume_statistics' not in ancestor_product.settings + assert 'convert_units' not in ancestor_product.settings + assert 'regrid' in ancestor_product.settings + assert 'extract_volume' in ancestor_product.settings + for ancestor_product in task.ancestors[1].products: + assert ancestor_product.attributes['short_name'] == 'volcello' + assert 'volume_statistics' not in ancestor_product.settings + assert 'convert_units' not in ancestor_product.settings + assert 'regrid' in ancestor_product.settings + assert 'extract_volume' in ancestor_product.settings + assert len(task.products) == 1 + product = task.products.pop() + assert product.attributes['short_name'] == 'tos' + assert product.files + assert 'volume_statistics' in product.settings + settings = product.settings['volume_statistics'] + assert len(settings) == 2 + assert settings['operator'] == 'mean' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert 'preproc' in fx_files['areacello'] + assert 'CMIP6_CanESM5_Ofx_historical_r1i1p1f1_areacello_2000-2005.nc' in \ + fx_files['areacello'] + + +def test_fx_vars_list_preproc_cmip6_fail(tmp_path, patched_failing_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + regrid: + target_grid: 1x1 + scheme: linear + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + convert_units: + units: K + volume_statistics: + operator: mean + fx_files: ['areacello', 'volcello'] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + with pytest.raises(RecipeError) as rec_err: + get_recipe(tmp_path, content, config_user) + msg = ('One or more of volume_statistics fx data for tos are missing. ' + 'Task can not be performed since there is no fx data found.') + assert rec_err == msg + + +def test_fx_vars_dicts_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + custom_order: true + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + volume_statistics: + operator: mean + fx_files: [{'short_name': 'areacello', 'mip': 'Ofx', + 'exp': 'piControl'}, + {'short_name': 'volcello', 'mip': 'Omon', + 'exp': 'historical'}] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 2 + assert len(task.ancestors[0].ancestors) == 0 + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_areacello' in [ + t.name for t in task.ancestors + ] + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_volcello' in [ + t.name for t in task.ancestors + ] + for product in task.products: + assert 'volume_statistics' in product.settings + assert product.attributes['short_name'] == 'tos' + for ancestor_product in task.ancestors[0].products: + assert ancestor_product.attributes['short_name'] == 'areacello' + assert ancestor_product.attributes['mip'] == 'Ofx' + assert ancestor_product.attributes['exp'] == 'piControl' + assert 'annual_statistics' not in ancestor_product.settings + assert 'volume_statistics' not in ancestor_product.settings + for ancestor_product in task.ancestors[1].products: + assert ancestor_product.attributes['short_name'] == 'volcello' + assert ancestor_product.attributes['mip'] == 'Omon' + assert ancestor_product.attributes['exp'] == 'historical' + assert 'annual_statistics' in ancestor_product.settings + assert 'volume_statistics' not in ancestor_product.settings + assert len(task.products) == 1 + product = task.products.pop() + assert product.attributes['short_name'] == 'tos' + assert product.files + assert 'volume_statistics' in product.settings + + +def test_fx_vars_dicts_activity_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + custom_order: true + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + volume_statistics: + operator: mean + fx_files: [{short_name: areacello, + mip: Ofx, exp: piControl, + activity: CMIP}] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f2 + grid: gn + additional_datasets: + - {dataset: UKESM1-0-LL, exp: ssp585} + - {dataset: UKESM1-0-LL, exp: ssp119} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 1 + assert len(task.ancestors[0].ancestors) == 0 + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_areacello' in [ + t.name for t in task.ancestors + ] + product_files = [ + anc_product.filename for anc_product + in task.ancestors[0].products + ] + anc_products = task.ancestors[0].products + assert len(anc_products) == 1 + assert len(product_files) == 1 + assert os.path.basename(product_files[0]) == \ + 'CMIP6_UKESM1-0-LL_Ofx_piControl_r1i1p1f2_areacello_2000-2005.nc' + for product in task.products: + assert 'volume_statistics' in product.settings + assert product.attributes['short_name'] == 'tos' + ancestor_product = list(anc_products)[0] + assert ancestor_product.attributes['short_name'] == 'areacello' + assert ancestor_product.attributes['mip'] == 'Ofx' + assert ancestor_product.attributes['exp'] == 'piControl' + assert ancestor_product.attributes['activity'] == 'CMIP' + assert 'annual_statistics' not in ancestor_product.settings + assert 'volume_statistics' not in ancestor_product.settings + + def test_fx_vars_volcello_in_omon_cmip6(tmp_path, patched_failing_datafinder, config_user): content = dedent(""" @@ -2097,33 +2453,103 @@ def test_invalid_fx_var_cmip6(tmp_path, patched_datafinder, config_user): assert msg in str(rec_err_exp.value) -def test_fx_var_invalid_project(tmp_path, patched_datafinder, config_user): +def test_fx_vars_duplicate_files(tmp_path, patched_datafinder, + config_user): content = dedent(""" preprocessors: preproc: - area_statistics: - operator: mean - fx_files: ['areacella'] + custom_order: True + annual_statistics: + operator: mean + extract_region: + start_longitude: -80. + end_longitude: 30. + start_latitude: -80. + end_latitude: 80. + area_statistics: + operator: mean + fx_files: [{short_name: areacello, mip: Ofx, + exp: piControl, activity: CMIP}] + regrid_time: + frequency: yr diagnostics: - diagnostic_name: + diag_437: variables: - tas: + tos: preprocessor: preproc - project: EMAC - mip: Amon - exp: historical + project: CMIP6 + mip: Omon start_year: 2000 end_year: 2005 - ensemble: r1i1p1f1 + ensemble: r1i1p1f2 grid: gn additional_datasets: - - {dataset: CanESM5} + - {dataset: UKESM1-0-LL, exp: ssp585} + - {dataset: UKESM1-0-LL, exp: ssp119} + scripts: null + diag_439: + variables: + sst: + short_name: tos + preprocessor: preproc + project: CMIP6 + mip: Omon + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f2 + grid: gn + additional_datasets: + - {dataset: UKESM1-0-LL, exp: ssp585} + sss: + short_name: tos + preprocessor: preproc + project: CMIP6 + mip: Omon + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f2 + grid: gn + additional_datasets: + - {dataset: UKESM1-0-LL, exp: ssp585} scripts: null """) - msg = ( - "Unable to load CMOR table (project) 'EMAC' for variable 'areacella' " - "with mip 'Amon'") - with pytest.raises(RecipeError) as rec_err_exp: - get_recipe(tmp_path, content, config_user) - assert str(rec_err_exp.value) == msg + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 3 + all_task_names = [ + 'diag_439/sst', 'diag_437/tos', 'diag_439/sss' + ] + task_name = 'diag_437' + TASKSEP + 'tos' + task = [ + elem for elem in recipe.tasks if elem.name == task_name + ][0] + assert task.name in all_task_names + assert len(task.ancestors) == 1 + assert len(task.ancestors[0].ancestors) == 0 + anc_name = 'diag_437' + TASKSEP + 'fx_area-volume_stats_areacello' + assert anc_name in [ + t.name for t in task.ancestors + ] + for product in task.products: + assert 'regrid_time' in product.settings + assert product.attributes['short_name'] == 'tos' + for anc_product in task.ancestors[0].products: + assert anc_product.attributes['short_name'] == 'areacello' + + # identify tasks + task_name_a = 'diag_439' + TASKSEP + 'sss' + task_a = [ + elem for elem in recipe.tasks if elem.name == task_name_a + ] + assert task_a + task_a = task_a[0] + task_name_b = 'diag_439' + TASKSEP + 'sst' + task_b = [ + elem for elem in recipe.tasks if elem.name == task_name_b + ] + assert task_b + task_b = task_b[0] + assert len(task_a.ancestors) == 1 + assert len(task_b.ancestors) == 1 From 4c3202fb25cae77a0045a451f24227c7bafd4428 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 24 Feb 2020 11:35:40 +0000 Subject: [PATCH 04/34] Klaus suggestion --- esmvalcore/_data_finder.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/esmvalcore/_data_finder.py b/esmvalcore/_data_finder.py index db6436120b..17c13f6488 100644 --- a/esmvalcore/_data_finder.py +++ b/esmvalcore/_data_finder.py @@ -249,20 +249,13 @@ def get_output_file(variable, preproc_dir, fx_var_alias=None): variable = dict(variable) variable['exp'] = '-'.join(variable['exp']) - if not fx_var_alias: - outfile = os.path.join( - preproc_dir, - variable['diagnostic'], - variable['variable_group'], - _replace_tags(cfg['output_file'], variable)[0], - ) - else: - outfile = os.path.join( - preproc_dir, - variable['diagnostic'], - variable['variable_group'], - _replace_tags(cfg['output_file'], fx_var_alias)[0], - ) + tag_source = variable if fx_var_alias is None else fx_var_alias + outfile = os.path.join( + preproc_dir, + variable['diagnostic'], + variable['variable_group'], + _replace_tags(cfg['output_file'], tag_source)[0], + ) if variable['frequency'] != 'fx': outfile += '_{start_year}-{end_year}'.format(**variable) outfile += '.nc' From 5dadda1f48c0dd78203fb2bf62e3c4fedb13a723 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 24 Feb 2020 11:43:22 +0000 Subject: [PATCH 05/34] Klaus suggestion Co-Authored-By: Klaus Zimmermann --- esmvalcore/_recipe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index b093c58af3..a52f95fb09 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -372,7 +372,7 @@ def _get_correct_fx_file(variable, fx_variable, config_user): # make it a dict if not isinstance(fx_variable, dict): fx_varname = fx_variable - fx_variable = {'short_name': fx_variable} + fx_variable = {'short_name': fx_varname} else: fx_varname = fx_variable['short_name'] From 887fe65c5083440688e5341139d6f62a538f019c Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 24 Feb 2020 15:17:24 +0000 Subject: [PATCH 06/34] Klaus suggestion --- esmvalcore/_recipe.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index a52f95fb09..c9a087abe9 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -383,15 +383,15 @@ def _get_correct_fx_file(variable, fx_variable, config_user): cmor_table = CMOR_TABLES[var_project] valid_fx_vars = [] - # Get all fx-related mips ('fx' always first, original mip last) - fx_mips = ['fx'] - fx_mips.extend( - [key for key in cmor_table.tables if 'fx' in key and key != 'fx']) - fx_mips.append(variable['mip']) - # force only the mip declared by user if 'mip' in fx_variable: fx_mips = [fx_variable['mip']] + else: + # Get all fx-related mips ('fx' always first, original mip last) + fx_mips = ['fx'] + fx_mips.extend( + [key for key in cmor_table.tables if 'fx' in key and key != 'fx']) + fx_mips.append(variable['mip']) # Search all mips for available variables # priority goes to user specified mip if available From c4f01e6a23799c40de854c1d930ee01429687874 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2020 11:20:08 +0000 Subject: [PATCH 07/34] batch suggestions by Klaus --- esmvalcore/_recipe.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index c9a087abe9..b5b25c434a 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -370,7 +370,7 @@ def _add_fxvar_keys(fx_var_dict, variable): def _get_correct_fx_file(variable, fx_variable, config_user): """Get fx files (searching all possible mips).""" # make it a dict - if not isinstance(fx_variable, dict): + if isinstance(fx_variable, str): fx_varname = fx_variable fx_variable = {'short_name': fx_varname} else: @@ -495,15 +495,17 @@ def _update_fx_settings(settings, variable, config_user): for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): var = dict(variable) fx_files_dict = {} - if settings.get(step, {}).get('fx_files'): - var['fx_files'] = settings.get(step, {}).get('fx_files') + var['fx_files'] = settings.get(step, {}).get('fx_files') + if var['fx_files']: for fxvar in var['fx_files']: fxvar_name = fxvar if isinstance(fxvar, dict): - fxvar_name = fxvar.get("short_name") - if not fxvar_name: + try: + fxvar_name = fxvar["short_name"] + except KeyError: raise RecipeError( - "Key short_name missing from {}".format(fxvar)) + "No valid short_name found in {}".format(fxvar) + ) _, cmor_fx_var = _get_correct_fx_file( variable, fxvar, config_user) if cmor_fx_var: From d485ffe6f51332dc69eddcb743cab47ec5629a93 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2020 11:54:47 +0000 Subject: [PATCH 08/34] comments from Klaus --- esmvalcore/_recipe.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index b5b25c434a..f8f6ea5cdf 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -466,18 +466,18 @@ def _update_weighting_settings(settings, variable): _exclude_dataset(settings, variable, 'weighting_landsea_fraction') -def _update_fx_settings(settings, variable, config_user): - """Find and set the FX mask settings.""" +def _get_fx_mask_settings(variable, settings, config_user): + """Assemble the fx var structures for land/sea masking.""" msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" if 'mask_landsea' in settings: - logger.debug('Getting fx mask settings now...') + logger.debug('Getting land-sea fx mask settings now...') fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) fx_list = [fx_file for fx_file in fx_dict.values() if fx_file] settings['mask_landsea']['fx_files'] = fx_list logger.info(msg, 'land/sea masking', pformat(fx_dict)) if 'mask_landseaice' in settings: - logger.debug('Getting fx mask settings now...') + logger.debug('Getting land-seaice fx mask settings now...') settings['mask_landseaice']['fx_files'] = [] fx_file, _ = _get_correct_fx_file(variable, 'sftgif', config_user) fx_files_dict = {'sftgif': fx_file} @@ -492,6 +492,10 @@ def _update_fx_settings(settings, variable, config_user): settings['weighting_landsea_fraction']['fx_files'] = fx_dict logger.info(msg, 'land/sea fraction weighting', pformat(fx_dict)) + +def _get_fx_stats_settings(variable, settings, config_user): + """Assemble the fx vars structures for area/volume stats.""" + msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): var = dict(variable) fx_files_dict = {} @@ -504,7 +508,7 @@ def _update_fx_settings(settings, variable, config_user): fxvar_name = fxvar["short_name"] except KeyError: raise RecipeError( - "No valid short_name found in {}".format(fxvar) + "No entry short_name found in {}".format(fxvar) ) _, cmor_fx_var = _get_correct_fx_file( variable, fxvar, config_user) @@ -519,6 +523,14 @@ def _update_fx_settings(settings, variable, config_user): logger.info(msg, step, pformat(fx_files_dict)) +def _update_fx_settings(settings, variable, config_user): + """Find and set the general FX mask/stats settings.""" + # get settings for fixed fx masks: land, sea or seaice or weighted + _get_fx_mask_settings(variable, settings, config_user) + # get settings for fx variables that need preprocessing + _get_fx_stats_settings(variable, settings, config_user) + + def _read_attributes(filename): """Read the attributes from a netcdf file.""" attributes = {} From bfe361171d1602609a848f92662a07fb3354e2a4 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2020 12:33:45 +0000 Subject: [PATCH 09/34] more Klaus suggestions and a change of flag name --- esmvalcore/_recipe.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index f8f6ea5cdf..5f33bc4a88 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -710,11 +710,11 @@ def get_matching(attributes): def _get_preprocessor_products(variables, profile, order, ancestor_products, - config_user, fx_case=False): + config_user, var_fxvar_coupling=False): """Get preprocessor product definitions for a set of datasets.""" products = set() - if not fx_case: + if not var_fxvar_coupling: for variable in variables: variable['filename'] = get_output_file(variable, config_user['preproc_dir']) @@ -747,9 +747,10 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, ) _update_extract_shape(settings, config_user) _update_weighting_settings(settings, variable) - _update_fx_settings(settings=settings, - variable=variable, - config_user=config_user) + _update_fx_settings( + settings=settings, + variable=variable, + config_user=config_user) _update_target_grid( variable=variable, variables=variables, @@ -787,7 +788,7 @@ def _get_single_preprocessor_task(variables, config_user, name, ancestor_tasks=None, - fx_case=False): + var_fxvar_coupling=False): """Create preprocessor tasks for a set of datasets w/ special case fx.""" if ancestor_tasks is None: ancestor_tasks = [] @@ -798,7 +799,7 @@ def _get_single_preprocessor_task(variables, check.check_for_temporal_preprocs(profile) ancestor_products = None - if not fx_case: + if not var_fxvar_coupling: products = _get_preprocessor_products( variables=variables, profile=profile, @@ -813,7 +814,7 @@ def _get_single_preprocessor_task(variables, order=order, ancestor_products=ancestor_products, config_user=config_user, - fx_case=True + var_fxvar_coupling=True ) if not products: @@ -997,7 +998,7 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): before, config_user, name=fx_name, - fx_case=True + var_fxvar_coupling=True ) fx_preproc_tasks.append(task) variable_pairs.append([var, fx_variable]) From e1c197cf70dc611d7ffe39706900716b61c14465 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2020 12:43:20 +0000 Subject: [PATCH 10/34] moar Klaus suggestions --- esmvalcore/_recipe.py | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 5f33bc4a88..bcb92f3a50 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -711,7 +711,13 @@ def get_matching(attributes): def _get_preprocessor_products(variables, profile, order, ancestor_products, config_user, var_fxvar_coupling=False): - """Get preprocessor product definitions for a set of datasets.""" + """ + Get preprocessor product definitions for a set of datasets. + + + It updates recipe settings as needed by various preprocessors + and sets the correct ancestry. + """ products = set() if not var_fxvar_coupling: @@ -799,23 +805,14 @@ def _get_single_preprocessor_task(variables, check.check_for_temporal_preprocs(profile) ancestor_products = None - if not var_fxvar_coupling: - products = _get_preprocessor_products( - variables=variables, - profile=profile, - order=order, - ancestor_products=ancestor_products, - config_user=config_user, - ) - else: - products = _get_preprocessor_products( - variables=variables, - profile=profile, - order=order, - ancestor_products=ancestor_products, - config_user=config_user, - var_fxvar_coupling=True - ) + products = _get_preprocessor_products( + variables=variables, + profile=profile, + order=order, + ancestor_products=ancestor_products, + config_user=config_user, + var_fxvar_coupling=var_fxvar_coupling + ) if not products: raise RecipeError( @@ -914,8 +911,8 @@ def append(group_prefix, var): return derive_input -def _remove_time_preproc(fxprofile): - """Remove all time preprocessors from the fx profile.""" +def _get_filtered_fxprofile(fxprofile): + """Remove all time preprocessors from the fx profile (returns a copy).""" fxprofile = deepcopy(fxprofile) for key in fxprofile: if key in TIME_PREPROCESSORS: @@ -989,7 +986,7 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): # remove time preprocessors for any fx/Ofx/Efx/etc # that dont have time coords if fx_variable['frequency'] == 'fx': - before = _remove_time_preproc(before) + before = _get_filtered_fxprofile(before) fx_name = task_name.split( TASKSEP)[0] + TASKSEP + 'fx_area-volume_stats_' + \ fx_variable['variable_group'] From 085bf1f0bd6018e9c3f08d4fa14dd5da9db809e8 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2020 12:53:43 +0000 Subject: [PATCH 11/34] moar Klaus suggestions --- esmvalcore/_recipe.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index bcb92f3a50..16534ee1b3 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -1023,8 +1023,9 @@ def _check_duplication(task): for anc_product in ancestor_task.products: ancestors_filename_dict[anc_product.filename] = ancestor_task filenames.append(anc_product.filename) - set_ancestors = list( - {ancestors_filename_dict[filename] for filename in filenames}) + set_ancestors = { + ancestors_filename_dict[filename] for filename in filenames + } task.ancestors = [ ancestor for ancestor in task.ancestors if ancestor in set_ancestors @@ -1120,10 +1121,10 @@ def _initialize_datasets(raw_datasets): @staticmethod def _expand_ensemble(variables): - """Expand ensemble members to multiple datasets. - - Expansion only support ensembles defined as strings, not lists. + """ + Expand ensemble members to multiple datasets. + Expansion only supports ensembles defined as strings, not lists. """ expanded = [] regex = re.compile(r'\(\d+:\d+\)') From a35ac22adfabd22285ffbadfe7bf2c84fae8c316 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2020 13:06:32 +0000 Subject: [PATCH 12/34] last bit of Klaus suggestion --- esmvalcore/_recipe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 16534ee1b3..7ce0197069 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -387,11 +387,11 @@ def _get_correct_fx_file(variable, fx_variable, config_user): if 'mip' in fx_variable: fx_mips = [fx_variable['mip']] else: - # Get all fx-related mips ('fx' always first, original mip last) - fx_mips = ['fx'] + # Get all fx-related mips (original var mip, + # 'fx' and extend from cmor tables) + fx_mips = [variable['mip'], 'fx'] fx_mips.extend( [key for key in cmor_table.tables if 'fx' in key and key != 'fx']) - fx_mips.append(variable['mip']) # Search all mips for available variables # priority goes to user specified mip if available From 39fbd19599c366c093fadfea30a4ff4a9730eb06 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2020 13:06:51 +0000 Subject: [PATCH 13/34] changed test in light of recipe change --- tests/integration/test_recipe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index 4952502ba6..604d8411cd 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -1891,8 +1891,8 @@ def test_fx_vars_volcello_in_ofx_cmip6(tmp_path, patched_datafinder, fx_files = settings['fx_files'] assert isinstance(fx_files, dict) assert len(fx_files) == 1 - assert '_Ofx_' in fx_files['volcello'] - assert '_Omon_' not in fx_files['volcello'] + assert '_Omon_' in fx_files['volcello'] + assert '_Ofx_' not in fx_files['volcello'] def test_fx_dicts_volcello_in_ofx_cmip6(tmp_path, patched_datafinder, From bb40b37877f5b8a859fce7c0c58194fbd487c1e5 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2020 13:46:42 +0000 Subject: [PATCH 14/34] reafctored get_preprocessor_task and added a separate function for fx vars preproc ancestry for ease of removal in the future --- esmvalcore/_recipe.py | 86 +++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 40 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 7ce0197069..c2a2efe7ed 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -921,6 +921,49 @@ def _get_filtered_fxprofile(fxprofile): return fxprofile +def _add_fxvar_preprocessing_ancestors(step, profile, variables, + config_user, task_name): + """Bolt on the fx vars preproc ancestors, if needed.""" + fx_preproc_tasks = [] + # conserve profile + fx_profile = deepcopy(profile) + fx_vars = profile.get(step, {}).get('fx_files') + + # Create tasks to prepare the input data for the fx var + order = _extract_preprocessor_order(fx_profile) + for var in variables: + fx_variables = [ + _get_correct_fx_file(var, fx_var, config_user)[1] + for fx_var in fx_vars + ] + for fx_variable in fx_variables: + # list may be (intentionally) empty - catch it here + if not fx_variable: + raise RecipeError( + f"One or more of {step} fx data " + f"for {var['short_name']} are missing. " + f"Task can not be performed since there is " + f"no fx data found.") + before, _ = _split_settings(fx_profile, step, order) + # remove time preprocessors for any fx/Ofx/Efx/etc + # that dont have time coords + if fx_variable['frequency'] == 'fx': + before = _get_filtered_fxprofile(before) + fx_name = task_name.split( + TASKSEP)[0] + TASKSEP + 'fx_area-volume_stats_' + \ + fx_variable['variable_group'] + task = _get_single_preprocessor_task( + [fx_variable, var], + before, + config_user, + name=fx_name, + var_fxvar_coupling=True + ) + fx_preproc_tasks.append(task) + + return fx_preproc_tasks + + def _get_preprocessor_task(variables, profiles, config_user, task_name): """Create preprocessor task(s) for a set of datasets.""" # First set up the preprocessor profile @@ -939,7 +982,6 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): _add_cmor_info(variable) # Create preprocessor task(s) derive_tasks = [] - fx_preproc_tasks = [] # set up tasks if variable.get('derive'): # Create tasks to prepare the input data for the derive step @@ -960,47 +1002,11 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): derive_tasks.append(task) # special case: fx variable pre-processing - variable_pairs = [] for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): if profile.get(step, {}).get('fx_files'): - # conserve profile - fx_profile = deepcopy(profile) - fx_vars = profile.get(step, {}).get('fx_files') - - # Create tasks to prepare the input data for the fx var - order = _extract_preprocessor_order(fx_profile) - for var in variables: - fx_variables = [ - _get_correct_fx_file(var, fx_var, config_user)[1] - for fx_var in fx_vars - ] - for fx_variable in fx_variables: - # list may be (intentionally) empty - catch it here - if not fx_variable: - raise RecipeError( - f"One or more of {step} fx data " - f"for {var['short_name']} are missing. " - f"Task can not be performed since there is " - f"no fx data found.") - before, _ = _split_settings(fx_profile, step, order) - # remove time preprocessors for any fx/Ofx/Efx/etc - # that dont have time coords - if fx_variable['frequency'] == 'fx': - before = _get_filtered_fxprofile(before) - fx_name = task_name.split( - TASKSEP)[0] + TASKSEP + 'fx_area-volume_stats_' + \ - fx_variable['variable_group'] - task = _get_single_preprocessor_task( - [fx_variable, var], - before, - config_user, - name=fx_name, - var_fxvar_coupling=True - ) - fx_preproc_tasks.append(task) - variable_pairs.append([var, fx_variable]) - - derive_tasks.extend(fx_preproc_tasks) + derive_tasks.extend( + _add_fxvar_preprocessing_ancestors(step, profile, variables, + config_user, task_name)) # Create (final) preprocessor task task = _get_single_preprocessor_task( From a293aca4762f1d043c35a435af2214d13bafdb8d Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2020 14:46:01 +0000 Subject: [PATCH 15/34] put in nested def so to eliminate code duplication --- esmvalcore/_recipe.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index c2a2efe7ed..d33376f566 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -469,12 +469,22 @@ def _update_weighting_settings(settings, variable): def _get_fx_mask_settings(variable, settings, config_user): """Assemble the fx var structures for land/sea masking.""" msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" - if 'mask_landsea' in settings: - logger.debug('Getting land-sea fx mask settings now...') + + def _update_landsea(operation, variable, config_user): + """Update settings for either landsea or weighted landsea.""" + logger.debug('Getting {operation} fx mask settings now...') fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) fx_list = [fx_file for fx_file in fx_dict.values() if fx_file] - settings['mask_landsea']['fx_files'] = fx_list - logger.info(msg, 'land/sea masking', pformat(fx_dict)) + if operation == 'mask_landsea': + settings[operation]['fx_files'] = fx_list + elif operation == 'weighting_landsea_fraction': + settings[operation]['fx_files'] = fx_dict + logger.info(msg, '{operation} masking', pformat(fx_dict)) + + landsea_ops = ['mask_landsea', 'weighting_landsea_fraction'] + for landsea_op in landsea_ops: + if landsea_op in settings: + _update_landsea(landsea_op, variable, config_user) if 'mask_landseaice' in settings: logger.debug('Getting land-seaice fx mask settings now...') @@ -486,12 +496,6 @@ def _get_fx_mask_settings(variable, settings, config_user): fx_files_dict['sftgif']) logger.info(msg, 'land/sea ice masking', pformat(fx_files_dict)) - if 'weighting_landsea_fraction' in settings: - logger.debug("Getting fx files for landsea fraction weighting now...") - fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) - settings['weighting_landsea_fraction']['fx_files'] = fx_dict - logger.info(msg, 'land/sea fraction weighting', pformat(fx_dict)) - def _get_fx_stats_settings(variable, settings, config_user): """Assemble the fx vars structures for area/volume stats.""" From facad13b085613a59b6e7ba373e8b2c794e84173 Mon Sep 17 00:00:00 2001 From: Klaus Zimmermann Date: Wed, 26 Feb 2020 16:51:52 +0100 Subject: [PATCH 16/34] Refactor fx settings --- esmvalcore/_recipe.py | 124 ++++++++++++++++++++++-------------------- 1 file changed, 65 insertions(+), 59 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index c2a2efe7ed..059bdfa2e5 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -466,69 +466,75 @@ def _update_weighting_settings(settings, variable): _exclude_dataset(settings, variable, 'weighting_landsea_fraction') -def _get_fx_mask_settings(variable, settings, config_user): - """Assemble the fx var structures for land/sea masking.""" - msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" - if 'mask_landsea' in settings: - logger.debug('Getting land-sea fx mask settings now...') - fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) - fx_list = [fx_file for fx_file in fx_dict.values() if fx_file] - settings['mask_landsea']['fx_files'] = fx_list - logger.info(msg, 'land/sea masking', pformat(fx_dict)) - - if 'mask_landseaice' in settings: - logger.debug('Getting land-seaice fx mask settings now...') - settings['mask_landseaice']['fx_files'] = [] - fx_file, _ = _get_correct_fx_file(variable, 'sftgif', config_user) - fx_files_dict = {'sftgif': fx_file} - if fx_files_dict['sftgif']: - settings['mask_landseaice']['fx_files'].append( - fx_files_dict['sftgif']) - logger.info(msg, 'land/sea ice masking', pformat(fx_files_dict)) - - if 'weighting_landsea_fraction' in settings: - logger.debug("Getting fx files for landsea fraction weighting now...") - fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) - settings['weighting_landsea_fraction']['fx_files'] = fx_dict - logger.info(msg, 'land/sea fraction weighting', pformat(fx_dict)) - - -def _get_fx_stats_settings(variable, settings, config_user): +def _update_fx_settings_mask(settings, variable, config_user, as_list): + # msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" + logger.debug('Getting land-sea fx mask settings now...') + fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) + if as_list: + settings['fx_files'] = [fx_file + for fx_file in fx_dict.values() + if fx_file] + else: + settings['fx_files'] = fx_dict + + +def _update_fx_settings_mask_landseaice(settings, + variable, + config_user): + logger.debug('Getting land-seaice fx mask settings now...') + settings['fx_files'] = [] + fx_file, _ = _get_correct_fx_file(variable, 'sftgif', config_user) + fx_files_dict = {'sftgif': fx_file} + if fx_files_dict['sftgif']: + settings['fx_files'].append(fx_files_dict['sftgif']) + # logger.info(msg, 'land/sea ice masking', pformat(fx_files_dict)) + + +def _update_fx_settings_stats(settings, variable, config_user): """Assemble the fx vars structures for area/volume stats.""" - msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" - for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): - var = dict(variable) - fx_files_dict = {} - var['fx_files'] = settings.get(step, {}).get('fx_files') - if var['fx_files']: - for fxvar in var['fx_files']: - fxvar_name = fxvar - if isinstance(fxvar, dict): - try: - fxvar_name = fxvar["short_name"] - except KeyError: - raise RecipeError( - "No entry short_name found in {}".format(fxvar) - ) - _, cmor_fx_var = _get_correct_fx_file( - variable, fxvar, config_user) - if cmor_fx_var: - fx_files_dict[fxvar_name] = get_output_file( - var, - config_user['preproc_dir'], - fx_var_alias=cmor_fx_var) - - # finally construct the fx files dictionary - settings[step]['fx_files'] = fx_files_dict - logger.info(msg, step, pformat(fx_files_dict)) + # msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" + var = dict(variable) + fx_files_dict = {} + var['fx_files'] = settings.get('fx_files') + if var['fx_files']: + for fxvar in var['fx_files']: + fxvar_name = fxvar + if isinstance(fxvar, dict): + try: + fxvar_name = fxvar["short_name"] + except KeyError: + raise RecipeError( + "No entry short_name found in {}".format(fxvar) + ) + _, cmor_fx_var = _get_correct_fx_file( + variable, fxvar, config_user) + if cmor_fx_var: + fx_files_dict[fxvar_name] = get_output_file( + var, + config_user['preproc_dir'], + fx_var_alias=cmor_fx_var) + + # finally construct the fx files dictionary + settings['fx_files'] = fx_files_dict + # logger.info(msg, step, pformat(fx_files_dict)) def _update_fx_settings(settings, variable, config_user): - """Find and set the general FX mask/stats settings.""" - # get settings for fixed fx masks: land, sea or seaice or weighted - _get_fx_mask_settings(variable, settings, config_user) - # get settings for fx variables that need preprocessing - _get_fx_stats_settings(variable, settings, config_user) + update_methods = { + 'mask_landsea': + (_update_fx_settings_mask, {'as_list': True}), + 'mask_landseaice': + (_update_fx_settings_mask_landseaice, {}), + 'weighting_landsea_fraction': + (_update_fx_settings_mask, {'as_list': False}), + 'area_statistics': (_update_fx_settings_stats, {}), + 'volume_statistics': (_update_fx_settings_stats, {}), + 'zonal_statistics': (_update_fx_settings_stats, {}), + } + for step_name, step_settings in settings.items(): + update_method, kwargs = update_methods.get(step_name, (None, {})) + if update_method: + update_method(step_settings, variable, config_user, **kwargs) def _read_attributes(filename): From cc0ca0c73efe8e8a496389c61add4f569c24fb78 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 26 Feb 2020 16:35:58 +0000 Subject: [PATCH 17/34] last suggestion from hero Klaus --- esmvalcore/_recipe.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index d33376f566..5bcae80d79 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -729,8 +729,7 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, variable['filename'] = get_output_file(variable, config_user['preproc_dir']) else: - parent_variable = variables[1] - variables = [variables[0]] + parent_variable = variables.pop() for variable in variables: variable['filename'] = get_output_file(parent_variable, config_user['preproc_dir'], From 6a5cf431fae3d0512e6a808e9b026555c7455383 Mon Sep 17 00:00:00 2001 From: Klaus Zimmermann Date: Wed, 26 Feb 2020 18:03:31 +0100 Subject: [PATCH 18/34] Refactor fx_files settings updating --- esmvalcore/_recipe.py | 46 +++++++++++++------------------------------ 1 file changed, 14 insertions(+), 32 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 059bdfa2e5..a96b1d387f 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -434,19 +434,6 @@ def _get_correct_fx_file(variable, fx_variable, config_user): return fx_files, valid_fx_vars -def _get_landsea_fraction_fx_dict(variable, config_user): - """Get dict of available ``sftlf`` and ``sftof`` variables.""" - fx_dict = {} - fx_vars = ['sftlf'] - if variable['project'] != 'obs4mips': - fx_vars.append('sftof') - for fx_var in fx_vars: - fx_file, _ = _get_correct_fx_file(variable, fx_var, config_user) - fx_dict[fx_var] = fx_file - - return fx_dict - - def _exclude_dataset(settings, variable, step): """Exclude dataset from specific preprocessor step if requested.""" exclude = { @@ -466,30 +453,19 @@ def _update_weighting_settings(settings, variable): _exclude_dataset(settings, variable, 'weighting_landsea_fraction') -def _update_fx_settings_mask(settings, variable, config_user, as_list): +def _update_fx_settings_mask(settings, variable, config_user, + fx_vars, as_list): # msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" logger.debug('Getting land-sea fx mask settings now...') - fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) + fx_dict = {fx_var: _get_correct_fx_file(variable, fx_var, config_user)[0] + for fx_var in fx_vars} if as_list: - settings['fx_files'] = [fx_file - for fx_file in fx_dict.values() + settings['fx_files'] = [fx_file for fx_file in fx_dict.values() if fx_file] else: settings['fx_files'] = fx_dict -def _update_fx_settings_mask_landseaice(settings, - variable, - config_user): - logger.debug('Getting land-seaice fx mask settings now...') - settings['fx_files'] = [] - fx_file, _ = _get_correct_fx_file(variable, 'sftgif', config_user) - fx_files_dict = {'sftgif': fx_file} - if fx_files_dict['sftgif']: - settings['fx_files'].append(fx_files_dict['sftgif']) - # logger.info(msg, 'land/sea ice masking', pformat(fx_files_dict)) - - def _update_fx_settings_stats(settings, variable, config_user): """Assemble the fx vars structures for area/volume stats.""" # msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" @@ -520,13 +496,19 @@ def _update_fx_settings_stats(settings, variable, config_user): def _update_fx_settings(settings, variable, config_user): + landseamask_vars = ['sftlf'] + if variable['project'] != 'obs4mips': + landseamask_vars.append('sftof') update_methods = { 'mask_landsea': - (_update_fx_settings_mask, {'as_list': True}), + (_update_fx_settings_mask, {'as_list': True, + 'fx_vars': landseamask_vars}), 'mask_landseaice': - (_update_fx_settings_mask_landseaice, {}), + (_update_fx_settings_mask, {'as_list': True, + 'fx_vars': ['sftgif']}), 'weighting_landsea_fraction': - (_update_fx_settings_mask, {'as_list': False}), + (_update_fx_settings_mask, {'as_list': False, + 'fx_vars': landseamask_vars}), 'area_statistics': (_update_fx_settings_stats, {}), 'volume_statistics': (_update_fx_settings_stats, {}), 'zonal_statistics': (_update_fx_settings_stats, {}), From ca6823864a787f4bcdbc58954f287132cecf0843 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2020 14:14:44 +0000 Subject: [PATCH 19/34] fixed the fx var handling for spatial stats --- esmvalcore/_recipe.py | 186 +++++++++++++++++++++++------------------- 1 file changed, 100 insertions(+), 86 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index a96b1d387f..ded7a3f679 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -175,8 +175,8 @@ def _update_target_levels(variable, variables, settings, config_user): short_name=variable_data['short_name'], mip=variable_data['mip'], frequency=variable_data['frequency'], - fix_dir=os.path.splitext( - variable_data['filename'])[0] + '_fixed', + fix_dir=os.path.splitext(variable_data['filename'])[0] + + '_fixed', ) @@ -232,8 +232,8 @@ def _dataset_to_file(variable, config_user): for required_var in required_vars: _augment(required_var, variable) _add_cmor_info(required_var, override=True) - (files, dirnames, filenames) = _get_input_files(required_var, - config_user) + (files, dirnames, + filenames) = _get_input_files(required_var, config_user) if files: variable = required_var break @@ -453,70 +453,86 @@ def _update_weighting_settings(settings, variable): _exclude_dataset(settings, variable, 'weighting_landsea_fraction') -def _update_fx_settings_mask(settings, variable, config_user, - fx_vars, as_list): - # msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" +def _update_fx_settings_mask(settings, variable, config_user, fx_vars, + as_list): + """Update settings with mask fx file list or dict.""" logger.debug('Getting land-sea fx mask settings now...') - fx_dict = {fx_var: _get_correct_fx_file(variable, fx_var, config_user)[0] - for fx_var in fx_vars} + fx_dict = { + fx_var: _get_correct_fx_file(variable, fx_var, config_user)[0] + for fx_var in fx_vars + } if as_list: - settings['fx_files'] = [fx_file for fx_file in fx_dict.values() - if fx_file] + fx_list = [ + fx_file for fx_file in fx_dict.values() if fx_file + ] + settings['fx_files'] = fx_list else: settings['fx_files'] = fx_dict + logger.info(pformat(settings['fx_files'])) -def _update_fx_settings_stats(settings, variable, config_user): - """Assemble the fx vars structures for area/volume stats.""" - # msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" - var = dict(variable) - fx_files_dict = {} - var['fx_files'] = settings.get('fx_files') - if var['fx_files']: - for fxvar in var['fx_files']: - fxvar_name = fxvar - if isinstance(fxvar, dict): - try: - fxvar_name = fxvar["short_name"] - except KeyError: - raise RecipeError( - "No entry short_name found in {}".format(fxvar) - ) - _, cmor_fx_var = _get_correct_fx_file( - variable, fxvar, config_user) - if cmor_fx_var: - fx_files_dict[fxvar_name] = get_output_file( - var, - config_user['preproc_dir'], - fx_var_alias=cmor_fx_var) - - # finally construct the fx files dictionary - settings['fx_files'] = fx_files_dict - # logger.info(msg, step, pformat(fx_files_dict)) +def _update_fx_settings_stats(settings, variable, config_user, fx_vars): + """Update settings with spatial analysis fx vars files dict.""" + if not fx_vars: + return + fx_vars = [ + _get_correct_fx_file(variable, fxvar, config_user)[1] + for fxvar in fx_vars + ] + fx_dict = { + fx_var['short_name']: get_output_file(variable, + config_user['preproc_dir'], + fx_var_alias=fx_var) + for fx_var in fx_vars if fx_var + } + settings['fx_files'] = fx_dict + logger.info(pformat(settings['fx_files'])) def _update_fx_settings(settings, variable, config_user): + """Update fx settings depending on the needed method.""" + msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" + # landsea mask fx variables landseamask_vars = ['sftlf'] if variable['project'] != 'obs4mips': landseamask_vars.append('sftof') + + # spatial stats fx variables + def _get_spatial_stats_fx_vars(settings, step_name): + spatial_stats_fx_vars = settings.get(step_name, {}).get('fx_files') + return spatial_stats_fx_vars + update_methods = { - 'mask_landsea': - (_update_fx_settings_mask, {'as_list': True, - 'fx_vars': landseamask_vars}), - 'mask_landseaice': - (_update_fx_settings_mask, {'as_list': True, - 'fx_vars': ['sftgif']}), - 'weighting_landsea_fraction': - (_update_fx_settings_mask, {'as_list': False, - 'fx_vars': landseamask_vars}), - 'area_statistics': (_update_fx_settings_stats, {}), - 'volume_statistics': (_update_fx_settings_stats, {}), - 'zonal_statistics': (_update_fx_settings_stats, {}), + 'mask_landsea': (_update_fx_settings_mask, { + 'as_list': True, + 'fx_vars': landseamask_vars + }), + 'mask_landseaice': (_update_fx_settings_mask, { + 'as_list': True, + 'fx_vars': ['sftgif'] + }), + 'weighting_landsea_fraction': (_update_fx_settings_mask, { + 'as_list': False, + 'fx_vars': landseamask_vars + }), + 'area_statistics': (_update_fx_settings_stats, { + 'fx_vars': + _get_spatial_stats_fx_vars(settings, 'area_statistics') + }), + 'volume_statistics': (_update_fx_settings_stats, { + 'fx_vars': + _get_spatial_stats_fx_vars(settings, 'volume_statistics') + }), + 'zonal_statistics': (_update_fx_settings_stats, { + 'fx_vars': + _get_spatial_stats_fx_vars(settings, 'zonal_statistics') + }), } for step_name, step_settings in settings.items(): update_method, kwargs = update_methods.get(step_name, (None, {})) if update_method: update_method(step_settings, variable, config_user, **kwargs) + logger.info(msg, step_name) def _read_attributes(filename): @@ -534,10 +550,10 @@ def _read_attributes(filename): def _get_input_files(variable, config_user): """Get the input files for a single dataset (locally and via download).""" - (input_files, dirnames, filenames) = get_input_filelist( - variable=variable, - rootpath=config_user['rootpath'], - drs=config_user['drs']) + (input_files, dirnames, + filenames) = get_input_filelist(variable=variable, + rootpath=config_user['rootpath'], + drs=config_user['drs']) # Set up downloading using synda if requested. # Do not download if files are already available locally. @@ -551,8 +567,8 @@ def _get_input_files(variable, config_user): def _get_ancestors(variable, config_user): """Get the input files for a single dataset and setup provenance.""" - (input_files, dirnames, filenames) = _get_input_files(variable, - config_user) + (input_files, dirnames, + filenames) = _get_input_files(variable, config_user) logger.info("Using input files for variable %s of dataset %s:\n%s", variable['short_name'], variable['dataset'], @@ -697,8 +713,12 @@ def get_matching(attributes): return grouped_products -def _get_preprocessor_products(variables, profile, order, ancestor_products, - config_user, var_fxvar_coupling=False): +def _get_preprocessor_products(variables, + profile, + order, + ancestor_products, + config_user, + var_fxvar_coupling=False): """ Get preprocessor product definitions for a set of datasets. @@ -741,10 +761,9 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, ) _update_extract_shape(settings, config_user) _update_weighting_settings(settings, variable) - _update_fx_settings( - settings=settings, - variable=variable, - config_user=config_user) + _update_fx_settings(settings=settings, + variable=variable, + config_user=config_user) _update_target_grid( variable=variable, variables=variables, @@ -754,8 +773,8 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, _update_regrid_time(variable, settings) ancestors = grouped_ancestors.get(variable['filename']) fx_files_in_settings = [ - setting.get('fx_files') for setting in settings.values() if - setting.get('fx_files') is not None + setting.get('fx_files') for setting in settings.values() + if setting.get('fx_files') is not None ] if not ancestors or fx_files_in_settings: ancestors = _get_ancestors(variable, config_user) @@ -799,8 +818,7 @@ def _get_single_preprocessor_task(variables, order=order, ancestor_products=ancestor_products, config_user=config_user, - var_fxvar_coupling=var_fxvar_coupling - ) + var_fxvar_coupling=var_fxvar_coupling) if not products: raise RecipeError( @@ -874,8 +892,7 @@ def append(group_prefix, var): for variable in variables: group_prefix = variable['variable_group'] + '_derive_input_' if not variable.get('force_derivation') and _get_input_files( - variable, - config_user)[0]: + variable, config_user)[0]: # No need to derive, just process normally up to derive step var = deepcopy(variable) append(group_prefix, var) @@ -909,8 +926,8 @@ def _get_filtered_fxprofile(fxprofile): return fxprofile -def _add_fxvar_preprocessing_ancestors(step, profile, variables, - config_user, task_name): +def _add_fxvar_preprocessing_ancestors(step, profile, variables, config_user, + task_name): """Bolt on the fx vars preproc ancestors, if needed.""" fx_preproc_tasks = [] # conserve profile @@ -927,11 +944,10 @@ def _add_fxvar_preprocessing_ancestors(step, profile, variables, for fx_variable in fx_variables: # list may be (intentionally) empty - catch it here if not fx_variable: - raise RecipeError( - f"One or more of {step} fx data " - f"for {var['short_name']} are missing. " - f"Task can not be performed since there is " - f"no fx data found.") + raise RecipeError(f"One or more of {step} fx data " + f"for {var['short_name']} are missing. " + f"Task can not be performed since there is " + f"no fx data found.") before, _ = _split_settings(fx_profile, step, order) # remove time preprocessors for any fx/Ofx/Efx/etc # that dont have time coords @@ -940,13 +956,11 @@ def _add_fxvar_preprocessing_ancestors(step, profile, variables, fx_name = task_name.split( TASKSEP)[0] + TASKSEP + 'fx_area-volume_stats_' + \ fx_variable['variable_group'] - task = _get_single_preprocessor_task( - [fx_variable, var], - before, - config_user, - name=fx_name, - var_fxvar_coupling=True - ) + task = _get_single_preprocessor_task([fx_variable, var], + before, + config_user, + name=fx_name, + var_fxvar_coupling=True) fx_preproc_tasks.append(task) return fx_preproc_tasks @@ -1018,7 +1032,8 @@ def _check_duplication(task): ancestors_filename_dict[anc_product.filename] = ancestor_task filenames.append(anc_product.filename) set_ancestors = { - ancestors_filename_dict[filename] for filename in filenames + ancestors_filename_dict[filename] + for filename in filenames } task.ancestors = [ ancestor for ancestor in task.ancestors @@ -1031,9 +1046,8 @@ def _check_duplication(task): class Recipe: """Recipe object.""" - info_keys = ( - 'project', 'activity', 'dataset', 'exp', 'ensemble', 'version' - ) + info_keys = ('project', 'activity', 'dataset', 'exp', 'ensemble', + 'version') """List of keys to be used to compose the alias, ordered by priority.""" def __init__(self, raw_recipe, From 2afe12f615291f7d11d349459f259ec62ccee10a Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2020 14:58:11 +0000 Subject: [PATCH 20/34] slight reformatting of logger message --- esmvalcore/_recipe.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 5c0e166de7..15b97b3b48 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -491,7 +491,7 @@ def _update_fx_settings_stats(settings, variable, config_user, fx_vars): def _update_fx_settings(settings, variable, config_user): """Update fx settings depending on the needed method.""" - msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" + msg = f"Using fx files for %s of dataset %s:\n%s" # landsea mask fx variables landseamask_vars = ['sftlf'] if variable['project'] != 'obs4mips': @@ -532,7 +532,8 @@ def _get_spatial_stats_fx_vars(settings, step_name): update_method, kwargs = update_methods.get(step_name, (None, {})) if update_method: update_method(step_settings, variable, config_user, **kwargs) - logger.info(msg, step_name) + logger.info(msg, variable['short_name'], + variable['dataset'], step_name) def _read_attributes(filename): From 5bd033ebbacbeb9d10ad3a72b5c55fb28d10c55e Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 14:27:12 +0000 Subject: [PATCH 21/34] renamed function to more appropriate name --- esmvalcore/_recipe.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 16b28335c1..f830abce6d 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -498,9 +498,9 @@ def _update_fx_settings(settings, variable, config_user): landseamask_vars.append('sftof') # spatial stats fx variables - def _get_spatial_stats_fx_vars(settings, step_name): - spatial_stats_fx_vars = settings.get(step_name, {}).get('fx_files') - return spatial_stats_fx_vars + def _get_fx_vars_from_attribute(settings, step_name): + user_fx_vars = settings.get(step_name, {}).get('fx_files') + return user_fx_vars update_methods = { 'mask_landsea': (_update_fx_settings_mask, { @@ -517,15 +517,15 @@ def _get_spatial_stats_fx_vars(settings, step_name): }), 'area_statistics': (_update_fx_settings_stats, { 'fx_vars': - _get_spatial_stats_fx_vars(settings, 'area_statistics') + _get_fx_vars_from_attribute(settings, 'area_statistics') }), 'volume_statistics': (_update_fx_settings_stats, { 'fx_vars': - _get_spatial_stats_fx_vars(settings, 'volume_statistics') + _get_fx_vars_from_attribute(settings, 'volume_statistics') }), 'zonal_statistics': (_update_fx_settings_stats, { 'fx_vars': - _get_spatial_stats_fx_vars(settings, 'zonal_statistics') + _get_fx_vars_from_attribute(settings, 'zonal_statistics') }), } for step_name, step_settings in settings.items(): From 2194cb669f773baa477f25396e378f131d273f88 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 15:09:45 +0000 Subject: [PATCH 22/34] unified functions and made it possible for user to define their own fx_files in preprocessor step --- esmvalcore/_recipe.py | 112 +++++++++++++++++++++++------------------- 1 file changed, 61 insertions(+), 51 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index f830abce6d..e4a914110a 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -453,77 +453,86 @@ def _update_weighting_settings(settings, variable): _exclude_dataset(settings, variable, 'weighting_landsea_fraction') -def _update_fx_settings_mask(settings, variable, config_user, fx_vars, - as_list): +def _update_fx_files(step, settings, variable, config_user, fx_vars, as_list): """Update settings with mask fx file list or dict.""" - logger.debug('Getting land-sea fx mask settings now...') - fx_dict = { - fx_var: _get_correct_fx_file(variable, fx_var, config_user)[0] - for fx_var in fx_vars - } - if as_list: - fx_list = [ - fx_file for fx_file in fx_dict.values() if fx_file + logger.debug('Getting fx settings for {} now...'.format(step)) + non_preproc_fx_steps = [ + 'mask_landsea', 'mask_landseaice', 'weighting_landsea_fraction' + ] + preproc_fx_steps = [ + 'area_statistics', 'volume_statistics', 'zonal_statistics' + ] + + if step in non_preproc_fx_steps: + fx_dict = { + fx_var: _get_correct_fx_file(variable, fx_var, config_user)[0] + for fx_var in fx_vars + } + if as_list: + fx_list = [fx_file for fx_file in fx_dict.values() if fx_file] + settings['fx_files'] = fx_list + else: + settings['fx_files'] = fx_dict + elif step in preproc_fx_steps: + if not fx_vars: + return + fx_vars = [ + _get_correct_fx_file(variable, fxvar, config_user)[1] + for fxvar in fx_vars ] - settings['fx_files'] = fx_list - else: + fx_dict = { + fx_var['short_name']: get_output_file(variable, + config_user['preproc_dir'], + fx_var_alias=fx_var) + for fx_var in fx_vars if fx_var + } settings['fx_files'] = fx_dict - logger.info(pformat(settings['fx_files'])) - - -def _update_fx_settings_stats(settings, variable, config_user, fx_vars): - """Update settings with spatial analysis fx vars files dict.""" - if not fx_vars: - return - fx_vars = [ - _get_correct_fx_file(variable, fxvar, config_user)[1] - for fxvar in fx_vars - ] - fx_dict = { - fx_var['short_name']: get_output_file(variable, - config_user['preproc_dir'], - fx_var_alias=fx_var) - for fx_var in fx_vars if fx_var - } - settings['fx_files'] = fx_dict - logger.info(pformat(settings['fx_files'])) + logger.info('Using fx_files: %s', pformat(settings['fx_files'])) def _update_fx_settings(settings, variable, config_user): """Update fx settings depending on the needed method.""" msg = f"Using fx files for %s of dataset %s:\n%s" - # landsea mask fx variables - landseamask_vars = ['sftlf'] - if variable['project'] != 'obs4mips': - landseamask_vars.append('sftof') - # spatial stats fx variables + # get fx variables either from user defined attribute or fixed def _get_fx_vars_from_attribute(settings, step_name): user_fx_vars = settings.get(step_name, {}).get('fx_files') + if not user_fx_vars: + user_fx_vars = ['sftlf'] + if variable['project'] != 'obs4mips': + user_fx_vars.append('sftof') + if step_name == 'mask_landseaice': + user_fx_vars = ['sftgif'] return user_fx_vars update_methods = { - 'mask_landsea': (_update_fx_settings_mask, { - 'as_list': True, - 'fx_vars': landseamask_vars + 'mask_landsea': (_update_fx_files, { + 'as_list': + True, + 'fx_vars': + _get_fx_vars_from_attribute(settings, 'mask_landsea') }), - 'mask_landseaice': (_update_fx_settings_mask, { - 'as_list': True, - 'fx_vars': ['sftgif'] + 'mask_landseaice': (_update_fx_files, { + 'as_list': + True, + 'fx_vars': + _get_fx_vars_from_attribute(settings, 'mask_landseaice') }), - 'weighting_landsea_fraction': (_update_fx_settings_mask, { - 'as_list': False, - 'fx_vars': landseamask_vars + 'weighting_landsea_fraction': (_update_fx_files, { + 'as_list': + False, + 'fx_vars': + _get_fx_vars_from_attribute(settings, 'weighting_landsea_fraction') }), - 'area_statistics': (_update_fx_settings_stats, { + 'area_statistics': (_update_fx_files, { 'fx_vars': _get_fx_vars_from_attribute(settings, 'area_statistics') }), - 'volume_statistics': (_update_fx_settings_stats, { + 'volume_statistics': (_update_fx_files, { 'fx_vars': _get_fx_vars_from_attribute(settings, 'volume_statistics') }), - 'zonal_statistics': (_update_fx_settings_stats, { + 'zonal_statistics': (_update_fx_files, { 'fx_vars': _get_fx_vars_from_attribute(settings, 'zonal_statistics') }), @@ -531,9 +540,10 @@ def _get_fx_vars_from_attribute(settings, step_name): for step_name, step_settings in settings.items(): update_method, kwargs = update_methods.get(step_name, (None, {})) if update_method: - update_method(step_settings, variable, config_user, **kwargs) - logger.info(msg, variable['short_name'], - variable['dataset'], step_name) + update_method(step_name, step_settings, variable, config_user, + **kwargs) + logger.info(msg, variable['short_name'], variable['dataset'], + step_name) def _read_attributes(filename): From e32ba97b6229f5171106314631db03f33f099b62 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 15:43:10 +0000 Subject: [PATCH 23/34] removed duplicated redundancy --- esmvalcore/_recipe.py | 49 ++++++++++--------------------------------- 1 file changed, 11 insertions(+), 38 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index e4a914110a..203f8af6e1 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -453,7 +453,7 @@ def _update_weighting_settings(settings, variable): _exclude_dataset(settings, variable, 'weighting_landsea_fraction') -def _update_fx_files(step, settings, variable, config_user, fx_vars, as_list): +def _update_fx_files(step, settings, variable, config_user, fx_vars): """Update settings with mask fx file list or dict.""" logger.debug('Getting fx settings for {} now...'.format(step)) non_preproc_fx_steps = [ @@ -468,11 +468,6 @@ def _update_fx_files(step, settings, variable, config_user, fx_vars, as_list): fx_var: _get_correct_fx_file(variable, fx_var, config_user)[0] for fx_var in fx_vars } - if as_list: - fx_list = [fx_file for fx_file in fx_dict.values() if fx_file] - settings['fx_files'] = fx_list - else: - settings['fx_files'] = fx_dict elif step in preproc_fx_steps: if not fx_vars: return @@ -486,7 +481,7 @@ def _update_fx_files(step, settings, variable, config_user, fx_vars, as_list): fx_var_alias=fx_var) for fx_var in fx_vars if fx_var } - settings['fx_files'] = fx_dict + settings['fx_files'] = fx_dict logger.info('Using fx_files: %s', pformat(settings['fx_files'])) @@ -505,38 +500,16 @@ def _get_fx_vars_from_attribute(settings, step_name): user_fx_vars = ['sftgif'] return user_fx_vars - update_methods = { - 'mask_landsea': (_update_fx_files, { - 'as_list': - True, - 'fx_vars': - _get_fx_vars_from_attribute(settings, 'mask_landsea') - }), - 'mask_landseaice': (_update_fx_files, { - 'as_list': - True, - 'fx_vars': - _get_fx_vars_from_attribute(settings, 'mask_landseaice') - }), - 'weighting_landsea_fraction': (_update_fx_files, { - 'as_list': - False, - 'fx_vars': - _get_fx_vars_from_attribute(settings, 'weighting_landsea_fraction') - }), - 'area_statistics': (_update_fx_files, { - 'fx_vars': - _get_fx_vars_from_attribute(settings, 'area_statistics') - }), - 'volume_statistics': (_update_fx_files, { - 'fx_vars': - _get_fx_vars_from_attribute(settings, 'volume_statistics') - }), - 'zonal_statistics': (_update_fx_files, { + fx_steps = [ + 'mask_landsea', 'mask_landseaice', 'weighting_landsea_fraction', + 'area_statistics', 'volume_statistics', 'zonal_statistics' + ] + update_methods = {} + for step in fx_steps: + update_methods[step] = (_update_fx_files, { 'fx_vars': - _get_fx_vars_from_attribute(settings, 'zonal_statistics') - }), - } + _get_fx_vars_from_attribute(settings, step) + }) for step_name, step_settings in settings.items(): update_method, kwargs = update_methods.get(step_name, (None, {})) if update_method: From e99344b35856f4bf527bd92d7e7c313abaa64fb4 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 15:55:33 +0000 Subject: [PATCH 24/34] let mask take dicts only for fx files --- esmvalcore/preprocessor/_mask.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/esmvalcore/preprocessor/_mask.py b/esmvalcore/preprocessor/_mask.py index a1dcf72555..dce0ae0396 100644 --- a/esmvalcore/preprocessor/_mask.py +++ b/esmvalcore/preprocessor/_mask.py @@ -100,8 +100,8 @@ def mask_landsea(cube, fx_files, mask_out, always_use_ne_mask=False): cube: iris.cube.Cube data cube to be masked. - fx_files: list - list holding the full paths to fx files. + fx_files: dict + dict: keys: fx variables, values: full paths to fx files. mask_out: str either "land" to mask out land mass or "sea" to mask out seas. @@ -131,6 +131,7 @@ def mask_landsea(cube, fx_files, mask_out, always_use_ne_mask=False): 'sea': os.path.join(cwd, 'ne_masks/ne_50m_ocean.shp') } + fx_files = list(itertools.chain.from_iterable(fx_files.values())) if fx_files and not always_use_ne_mask: fx_cubes = {} for fx_file in fx_files: @@ -192,8 +193,8 @@ def mask_landseaice(cube, fx_files, mask_out): cube: iris.cube.Cube data cube to be masked. - fx_files: list - list holding the full paths to fx files. + fx_files: dict + dict: keys: fx variables, values: full paths to fx files. mask_out: str either "landsea" to mask out landsea or "ice" to mask out ice. @@ -211,7 +212,8 @@ def mask_landseaice(cube, fx_files, mask_out): Error raised if fx_files list is empty. """ - # sftgif is the only one so far + # sftgif is the only one so far but users can set others + fx_files = list(itertools.chain.from_iterable(fx_files.values())) if fx_files: for fx_file in fx_files: fx_cube = iris.load_cube(fx_file) From 4e62f1f88bf5b2c0b9812bbc44201d20e1c84a57 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 16:02:23 +0000 Subject: [PATCH 25/34] make mask work with dicts for fx files --- esmvalcore/preprocessor/_mask.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/esmvalcore/preprocessor/_mask.py b/esmvalcore/preprocessor/_mask.py index dce0ae0396..ad158ff4f3 100644 --- a/esmvalcore/preprocessor/_mask.py +++ b/esmvalcore/preprocessor/_mask.py @@ -7,15 +7,16 @@ missing values masking. """ +import itertools import logging import os import cartopy.io.shapereader as shpreader import iris -from iris.analysis import Aggregator -from iris.util import rolling_window import numpy as np import shapely.vectorized as shp_vect +from iris.analysis import Aggregator +from iris.util import rolling_window logger = logging.getLogger(__name__) From 997d0102716b755b5a40bd2d6adf72b93b3427ee Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 16:02:38 +0000 Subject: [PATCH 26/34] corrected test for dicts fx files --- tests/integration/test_recipe.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index 604d8411cd..3c47ebfeaf 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -1,3 +1,4 @@ +import itertools import os from pathlib import Path from pprint import pformat @@ -1721,7 +1722,8 @@ def test_landmask(tmp_path, patched_datafinder, config_user): assert len(settings) == 2 assert settings['mask_out'] == 'sea' fx_files = settings['fx_files'] - assert isinstance(fx_files, list) + assert isinstance(fx_files, dict) + fx_files = list(itertools.chain.from_iterable(fx_files.values())) if product.attributes['project'] == 'obs4mips': assert len(fx_files) == 1 else: @@ -1771,7 +1773,8 @@ def test_landmask_no_fx(tmp_path, patched_failing_datafinder, config_user): assert settings['mask_out'] == 'sea' assert settings['always_use_ne_mask'] is False fx_files = settings['fx_files'] - assert isinstance(fx_files, list) + assert isinstance(fx_files, dict) + fx_files = list(itertools.chain.from_iterable(fx_files.values())) assert fx_files == [] @@ -1838,7 +1841,8 @@ def test_fx_vars_mip_change_cmip6(tmp_path, patched_datafinder, config_user): assert len(settings) == 2 assert settings['mask_out'] == 'sea' fx_files = settings['fx_files'] - assert isinstance(fx_files, list) + assert isinstance(fx_files, dict) + fx_files = list(itertools.chain.from_iterable(fx_files.values())) assert len(fx_files) == 2 for fx_file in fx_files: if 'sftlf' in fx_file: @@ -1990,7 +1994,7 @@ def test_fx_vars_list_no_preproc_cmip6(tmp_path, patched_datafinder, assert product.files assert 'area_statistics' in product.settings settings = product.settings['area_statistics'] - assert len(settings) == 1 + assert len(settings) == 2 assert settings['operator'] == 'mean' assert 'fx_files' not in settings From dac52c4bd90f45e1786284967c47969a7cdb06ca Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 16:26:37 +0000 Subject: [PATCH 27/34] corrected for dicts all over --- .../integration/preprocessor/_mask/test_mask.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/integration/preprocessor/_mask/test_mask.py b/tests/integration/preprocessor/_mask/test_mask.py index c89c091551..afd96ac16f 100644 --- a/tests/integration/preprocessor/_mask/test_mask.py +++ b/tests/integration/preprocessor/_mask/test_mask.py @@ -63,8 +63,10 @@ def test_mask_landsea(self): dim_coords_and_dims=self.coords_spec) # mask with fx files - result_land = mask_landsea(new_cube_land, ['sftlf_test.nc'], 'land') - result_sea = mask_landsea(new_cube_sea, ['sftlf_test.nc'], 'sea') + result_land = mask_landsea(new_cube_land, + {'sftlf': 'sftlf_test.nc'}, 'land') + result_sea = mask_landsea(new_cube_sea, + {'sftlf': 'sftlf_test.nc'}, 'sea') expected = np.ma.empty((3, 3)) expected.data[:] = 200. expected.mask = np.ones((3, 3), bool) @@ -83,10 +85,10 @@ def test_mask_landsea(self): dim_coords_and_dims=self.coords_spec) new_cube_sea = iris.cube.Cube(self.new_cube_data, dim_coords_and_dims=self.coords_spec) - result_land = mask_landsea(new_cube_land, ['sftlf_test.nc'], + result_land = mask_landsea(new_cube_land, {'sftlf': 'sftlf_test.nc'}, 'land', always_use_ne_mask=True) - result_sea = mask_landsea(new_cube_sea, ['sftlf_test.nc'], + result_sea = mask_landsea(new_cube_sea, {'sftlf': 'sftlf_test.nc'}, 'sea', always_use_ne_mask=True) @@ -106,8 +108,8 @@ def test_mask_landsea(self): dim_coords_and_dims=self.coords_spec) new_cube_sea = iris.cube.Cube(self.new_cube_data, dim_coords_and_dims=self.coords_spec) - result_land = mask_landsea(new_cube_land, None, 'land') - result_sea = mask_landsea(new_cube_sea, None, 'sea') + result_land = mask_landsea(new_cube_land, {}, 'land') + result_sea = mask_landsea(new_cube_sea, {}, 'sea') # bear in mind all points are in the ocean np.ma.set_fill_value(result_land.data, 1e+20) @@ -122,7 +124,8 @@ def test_mask_landseaice(self): iris.save(self.fx_mask, 'sftgif_test.nc') new_cube_ice = iris.cube.Cube(self.new_cube_data, dim_coords_and_dims=self.coords_spec) - result_ice = mask_landseaice(new_cube_ice, ['sftgif_test.nc'], 'ice') + result_ice = mask_landseaice(new_cube_ice, + {'sftgif': 'sftgif_test.nc'}, 'ice') expected = np.ma.empty((3, 3)) expected.data[:] = 200. expected.mask = np.ones((3, 3), bool) From 2aab695b888900050a8608b2f229bd8a25988946 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 16:27:16 +0000 Subject: [PATCH 28/34] cancelled stupid complication with lists of lists --- esmvalcore/preprocessor/_mask.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/esmvalcore/preprocessor/_mask.py b/esmvalcore/preprocessor/_mask.py index ad158ff4f3..6fd1310aa7 100644 --- a/esmvalcore/preprocessor/_mask.py +++ b/esmvalcore/preprocessor/_mask.py @@ -7,7 +7,6 @@ missing values masking. """ -import itertools import logging import os @@ -132,7 +131,7 @@ def mask_landsea(cube, fx_files, mask_out, always_use_ne_mask=False): 'sea': os.path.join(cwd, 'ne_masks/ne_50m_ocean.shp') } - fx_files = list(itertools.chain.from_iterable(fx_files.values())) + fx_files = fx_files.values() if fx_files and not always_use_ne_mask: fx_cubes = {} for fx_file in fx_files: @@ -214,7 +213,7 @@ def mask_landseaice(cube, fx_files, mask_out): """ # sftgif is the only one so far but users can set others - fx_files = list(itertools.chain.from_iterable(fx_files.values())) + fx_files = fx_files.values() if fx_files: for fx_file in fx_files: fx_cube = iris.load_cube(fx_file) From 9cfdd3ad82bd15132b196c8fe2a4c2c74618f864 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 16:28:45 +0000 Subject: [PATCH 29/34] corrected for simpler approach --- tests/integration/test_recipe.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index 3c47ebfeaf..e313a7dc90 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -1,4 +1,3 @@ -import itertools import os from pathlib import Path from pprint import pformat @@ -1723,7 +1722,7 @@ def test_landmask(tmp_path, patched_datafinder, config_user): assert settings['mask_out'] == 'sea' fx_files = settings['fx_files'] assert isinstance(fx_files, dict) - fx_files = list(itertools.chain.from_iterable(fx_files.values())) + fx_files = fx_files.values() if product.attributes['project'] == 'obs4mips': assert len(fx_files) == 1 else: @@ -1774,8 +1773,8 @@ def test_landmask_no_fx(tmp_path, patched_failing_datafinder, config_user): assert settings['always_use_ne_mask'] is False fx_files = settings['fx_files'] assert isinstance(fx_files, dict) - fx_files = list(itertools.chain.from_iterable(fx_files.values())) - assert fx_files == [] + fx_files = fx_files.values() + assert not any(fx_files) def test_fx_vars_mip_change_cmip6(tmp_path, patched_datafinder, config_user): @@ -1842,7 +1841,7 @@ def test_fx_vars_mip_change_cmip6(tmp_path, patched_datafinder, config_user): assert settings['mask_out'] == 'sea' fx_files = settings['fx_files'] assert isinstance(fx_files, dict) - fx_files = list(itertools.chain.from_iterable(fx_files.values())) + fx_files = fx_files.values() assert len(fx_files) == 2 for fx_file in fx_files: if 'sftlf' in fx_file: @@ -1996,7 +1995,7 @@ def test_fx_vars_list_no_preproc_cmip6(tmp_path, patched_datafinder, settings = product.settings['area_statistics'] assert len(settings) == 2 assert settings['operator'] == 'mean' - assert 'fx_files' not in settings + assert 'fx_files' in settings def test_fx_vars_list_preproc_cmip6(tmp_path, patched_datafinder, From b5a4fc4bc0da99b63d2beb54e920d4b6e972e305 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 16:41:45 +0000 Subject: [PATCH 30/34] account for empty list of keys --- esmvalcore/preprocessor/_mask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esmvalcore/preprocessor/_mask.py b/esmvalcore/preprocessor/_mask.py index 6fd1310aa7..49a25ed5bb 100644 --- a/esmvalcore/preprocessor/_mask.py +++ b/esmvalcore/preprocessor/_mask.py @@ -132,7 +132,7 @@ def mask_landsea(cube, fx_files, mask_out, always_use_ne_mask=False): } fx_files = fx_files.values() - if fx_files and not always_use_ne_mask: + if any(fx_files) and not always_use_ne_mask: fx_cubes = {} for fx_file in fx_files: fxfile_members = os.path.basename(fx_file).split('_') @@ -214,7 +214,7 @@ def mask_landseaice(cube, fx_files, mask_out): """ # sftgif is the only one so far but users can set others fx_files = fx_files.values() - if fx_files: + if any(fx_files): for fx_file in fx_files: fx_cube = iris.load_cube(fx_file) From 87fe74f33bcfea0dca566d42ccf23398032d27f5 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 16:43:52 +0000 Subject: [PATCH 31/34] dict comprehensione --- esmvalcore/_recipe.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 203f8af6e1..254b7ac686 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -504,12 +504,12 @@ def _get_fx_vars_from_attribute(settings, step_name): 'mask_landsea', 'mask_landseaice', 'weighting_landsea_fraction', 'area_statistics', 'volume_statistics', 'zonal_statistics' ] - update_methods = {} - for step in fx_steps: - update_methods[step] = (_update_fx_files, { - 'fx_vars': - _get_fx_vars_from_attribute(settings, step) + update_methods = { + step: (_update_fx_files, { + 'fx_vars': _get_fx_vars_from_attribute(settings, step) }) + for step in fx_steps + } for step_name, step_settings in settings.items(): update_method, kwargs = update_methods.get(step_name, (None, {})) if update_method: From 36010e1d97cbaf770239beb5f2456a916556b6b0 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 17:45:47 +0000 Subject: [PATCH 32/34] simplified even more and optimized handling --- esmvalcore/_recipe.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 254b7ac686..cc030ad5d3 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -404,13 +404,13 @@ def _get_correct_fx_file(variable, fx_variable, config_user): searched_mips.append(fx_mip) fx_var_dict['mip'] = fx_mip fx_var_dict = _add_fxvar_keys(fx_var_dict, var) + valid_fx_vars.append(fx_var_dict) logger.debug("For fx variable '%s', found table '%s'", fx_varname, fx_mip) fx_files = _get_input_files(fx_var_dict, config_user)[0] # If files found, return them if fx_files: - valid_fx_vars.append(fx_var_dict) logger.debug("Found fx variables '%s':\n%s", fx_varname, pformat(fx_files)) break @@ -463,23 +463,20 @@ def _update_fx_files(step, settings, variable, config_user, fx_vars): 'area_statistics', 'volume_statistics', 'zonal_statistics' ] + fx_vars = [ + _get_correct_fx_file(variable, fxvar, config_user) + for fxvar in fx_vars + ] + if step in non_preproc_fx_steps: - fx_dict = { - fx_var: _get_correct_fx_file(variable, fx_var, config_user)[0] - for fx_var in fx_vars - } + fx_dict = {fx_var[1]['short_name']: fx_var[0] for fx_var in fx_vars} elif step in preproc_fx_steps: - if not fx_vars: - return - fx_vars = [ - _get_correct_fx_file(variable, fxvar, config_user)[1] - for fxvar in fx_vars - ] fx_dict = { - fx_var['short_name']: get_output_file(variable, - config_user['preproc_dir'], - fx_var_alias=fx_var) - for fx_var in fx_vars if fx_var + fx_var[1]['short_name']: + get_output_file(variable, + config_user['preproc_dir'], + fx_var_alias=fx_var[1]) + for fx_var in fx_vars if fx_var[1] } settings['fx_files'] = fx_dict logger.info('Using fx_files: %s', pformat(settings['fx_files'])) From 4e14f0ef17fb2499e5a085c9a8d661d2c9607d20 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 17:46:15 +0000 Subject: [PATCH 33/34] added test for user specified vfx variable for landsea mask --- tests/integration/test_recipe.py | 38 ++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index e313a7dc90..3aa10fbed7 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -1729,6 +1729,44 @@ def test_landmask(tmp_path, patched_datafinder, config_user): assert len(fx_files) == 2 +def test_landmask_change_fxvar(tmp_path, patched_datafinder, config_user): + content = dedent(""" + preprocessors: + landmask: + mask_landsea: + mask_out: sea + fx_files: [{'short_name': 'sftlf', 'exp': 'piControl'}] + + diagnostics: + diagnostic_name: + variables: + gpp: + preprocessor: landmask + project: CMIP5 + mip: Lmon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1 + additional_datasets: + - {dataset: CanESM2} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check custom fx variables + task = recipe.tasks.pop() + product = task.products.pop() + settings = product.settings['mask_landsea'] + assert len(settings) == 2 + assert settings['mask_out'] == 'sea' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert len(fx_files) == 1 + assert '_fx_' in fx_files['sftlf'] + assert '_piControl_' in fx_files['sftlf'] + + def test_landmask_no_fx(tmp_path, patched_failing_datafinder, config_user): content = dedent(""" preprocessors: From b0fa35d27de6c6626109b29165ae924dfc9f6880 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 5 Mar 2020 17:57:11 +0000 Subject: [PATCH 34/34] added one more test and fixed flake shtuff --- tests/integration/test_recipe.py | 43 ++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index 3aa10fbed7..52a90251a8 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -1729,13 +1729,13 @@ def test_landmask(tmp_path, patched_datafinder, config_user): assert len(fx_files) == 2 -def test_landmask_change_fxvar(tmp_path, patched_datafinder, config_user): +def test_landseamask_change_fxvar(tmp_path, patched_datafinder, config_user): content = dedent(""" preprocessors: landmask: mask_landsea: mask_out: sea - fx_files: [{'short_name': 'sftlf', 'exp': 'piControl'}] + fx_files: [{'short_name': 'sftlf', 'exp': 'piControl'}] diagnostics: diagnostic_name: @@ -1767,6 +1767,45 @@ def test_landmask_change_fxvar(tmp_path, patched_datafinder, config_user): assert '_piControl_' in fx_files['sftlf'] +def test_landseaicemask_change_fxvar(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + landmask: + mask_landseaice: + mask_out: sea + fx_files: [{'short_name': 'sftgif', 'exp': 'piControl'}] + + diagnostics: + diagnostic_name: + variables: + gpp: + preprocessor: landmask + project: CMIP5 + mip: Lmon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1 + additional_datasets: + - {dataset: CanESM2} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check custom fx variables + task = recipe.tasks.pop() + product = task.products.pop() + settings = product.settings['mask_landseaice'] + assert len(settings) == 2 + assert settings['mask_out'] == 'sea' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert len(fx_files) == 1 + assert '_fx_' in fx_files['sftgif'] + assert '_piControl_' in fx_files['sftgif'] + + def test_landmask_no_fx(tmp_path, patched_failing_datafinder, config_user): content = dedent(""" preprocessors: