diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index 5bcae80d79..15b97b3b48 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -175,8 +175,8 @@ def _update_target_levels(variable, variables, settings, config_user): short_name=variable_data['short_name'], mip=variable_data['mip'], frequency=variable_data['frequency'], - fix_dir=os.path.splitext( - variable_data['filename'])[0] + '_fixed', + fix_dir=os.path.splitext(variable_data['filename'])[0] + + '_fixed', ) @@ -232,8 +232,8 @@ def _dataset_to_file(variable, config_user): for required_var in required_vars: _augment(required_var, variable) _add_cmor_info(required_var, override=True) - (files, dirnames, filenames) = _get_input_files(required_var, - config_user) + (files, dirnames, + filenames) = _get_input_files(required_var, config_user) if files: variable = required_var break @@ -434,19 +434,6 @@ def _get_correct_fx_file(variable, fx_variable, config_user): return fx_files, valid_fx_vars -def _get_landsea_fraction_fx_dict(variable, config_user): - """Get dict of available ``sftlf`` and ``sftof`` variables.""" - fx_dict = {} - fx_vars = ['sftlf'] - if variable['project'] != 'obs4mips': - fx_vars.append('sftof') - for fx_var in fx_vars: - fx_file, _ = _get_correct_fx_file(variable, fx_var, config_user) - fx_dict[fx_var] = fx_file - - return fx_dict - - def _exclude_dataset(settings, variable, step): """Exclude dataset from specific preprocessor step if requested.""" exclude = { @@ -466,73 +453,87 @@ def _update_weighting_settings(settings, variable): _exclude_dataset(settings, variable, 'weighting_landsea_fraction') -def _get_fx_mask_settings(variable, settings, config_user): - """Assemble the fx var structures for land/sea masking.""" - msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" - - def _update_landsea(operation, variable, config_user): - """Update settings for either landsea or weighted landsea.""" - logger.debug('Getting {operation} fx mask settings now...') - fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) - fx_list = [fx_file for fx_file in fx_dict.values() if fx_file] - if operation == 'mask_landsea': - settings[operation]['fx_files'] = fx_list - elif operation == 'weighting_landsea_fraction': - settings[operation]['fx_files'] = fx_dict - logger.info(msg, '{operation} masking', pformat(fx_dict)) - - landsea_ops = ['mask_landsea', 'weighting_landsea_fraction'] - for landsea_op in landsea_ops: - if landsea_op in settings: - _update_landsea(landsea_op, variable, config_user) - - if 'mask_landseaice' in settings: - logger.debug('Getting land-seaice fx mask settings now...') - settings['mask_landseaice']['fx_files'] = [] - fx_file, _ = _get_correct_fx_file(variable, 'sftgif', config_user) - fx_files_dict = {'sftgif': fx_file} - if fx_files_dict['sftgif']: - settings['mask_landseaice']['fx_files'].append( - fx_files_dict['sftgif']) - logger.info(msg, 'land/sea ice masking', pformat(fx_files_dict)) - - -def _get_fx_stats_settings(variable, settings, config_user): - """Assemble the fx vars structures for area/volume stats.""" - msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" - for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): - var = dict(variable) - fx_files_dict = {} - var['fx_files'] = settings.get(step, {}).get('fx_files') - if var['fx_files']: - for fxvar in var['fx_files']: - fxvar_name = fxvar - if isinstance(fxvar, dict): - try: - fxvar_name = fxvar["short_name"] - except KeyError: - raise RecipeError( - "No entry short_name found in {}".format(fxvar) - ) - _, cmor_fx_var = _get_correct_fx_file( - variable, fxvar, config_user) - if cmor_fx_var: - fx_files_dict[fxvar_name] = get_output_file( - var, - config_user['preproc_dir'], - fx_var_alias=cmor_fx_var) - - # finally construct the fx files dictionary - settings[step]['fx_files'] = fx_files_dict - logger.info(msg, step, pformat(fx_files_dict)) +def _update_fx_settings_mask(settings, variable, config_user, fx_vars, + as_list): + """Update settings with mask fx file list or dict.""" + logger.debug('Getting land-sea fx mask settings now...') + fx_dict = { + fx_var: _get_correct_fx_file(variable, fx_var, config_user)[0] + for fx_var in fx_vars + } + if as_list: + fx_list = [ + fx_file for fx_file in fx_dict.values() if fx_file + ] + settings['fx_files'] = fx_list + else: + settings['fx_files'] = fx_dict + logger.info(pformat(settings['fx_files'])) + + +def _update_fx_settings_stats(settings, variable, config_user, fx_vars): + """Update settings with spatial analysis fx vars files dict.""" + if not fx_vars: + return + fx_vars = [ + _get_correct_fx_file(variable, fxvar, config_user)[1] + for fxvar in fx_vars + ] + fx_dict = { + fx_var['short_name']: get_output_file(variable, + config_user['preproc_dir'], + fx_var_alias=fx_var) + for fx_var in fx_vars if fx_var + } + settings['fx_files'] = fx_dict + logger.info(pformat(settings['fx_files'])) def _update_fx_settings(settings, variable, config_user): - """Find and set the general FX mask/stats settings.""" - # get settings for fixed fx masks: land, sea or seaice or weighted - _get_fx_mask_settings(variable, settings, config_user) - # get settings for fx variables that need preprocessing - _get_fx_stats_settings(variable, settings, config_user) + """Update fx settings depending on the needed method.""" + msg = f"Using fx files for %s of dataset %s:\n%s" + # landsea mask fx variables + landseamask_vars = ['sftlf'] + if variable['project'] != 'obs4mips': + landseamask_vars.append('sftof') + + # spatial stats fx variables + def _get_spatial_stats_fx_vars(settings, step_name): + spatial_stats_fx_vars = settings.get(step_name, {}).get('fx_files') + return spatial_stats_fx_vars + + update_methods = { + 'mask_landsea': (_update_fx_settings_mask, { + 'as_list': True, + 'fx_vars': landseamask_vars + }), + 'mask_landseaice': (_update_fx_settings_mask, { + 'as_list': True, + 'fx_vars': ['sftgif'] + }), + 'weighting_landsea_fraction': (_update_fx_settings_mask, { + 'as_list': False, + 'fx_vars': landseamask_vars + }), + 'area_statistics': (_update_fx_settings_stats, { + 'fx_vars': + _get_spatial_stats_fx_vars(settings, 'area_statistics') + }), + 'volume_statistics': (_update_fx_settings_stats, { + 'fx_vars': + _get_spatial_stats_fx_vars(settings, 'volume_statistics') + }), + 'zonal_statistics': (_update_fx_settings_stats, { + 'fx_vars': + _get_spatial_stats_fx_vars(settings, 'zonal_statistics') + }), + } + for step_name, step_settings in settings.items(): + update_method, kwargs = update_methods.get(step_name, (None, {})) + if update_method: + update_method(step_settings, variable, config_user, **kwargs) + logger.info(msg, variable['short_name'], + variable['dataset'], step_name) def _read_attributes(filename): @@ -550,10 +551,10 @@ def _read_attributes(filename): def _get_input_files(variable, config_user): """Get the input files for a single dataset (locally and via download).""" - (input_files, dirnames, filenames) = get_input_filelist( - variable=variable, - rootpath=config_user['rootpath'], - drs=config_user['drs']) + (input_files, dirnames, + filenames) = get_input_filelist(variable=variable, + rootpath=config_user['rootpath'], + drs=config_user['drs']) # Set up downloading using synda if requested. # Do not download if files are already available locally. @@ -567,8 +568,8 @@ def _get_input_files(variable, config_user): def _get_ancestors(variable, config_user): """Get the input files for a single dataset and setup provenance.""" - (input_files, dirnames, filenames) = _get_input_files(variable, - config_user) + (input_files, dirnames, + filenames) = _get_input_files(variable, config_user) logger.info("Using input files for variable %s of dataset %s:\n%s", variable['short_name'], variable['dataset'], @@ -713,8 +714,12 @@ def get_matching(attributes): return grouped_products -def _get_preprocessor_products(variables, profile, order, ancestor_products, - config_user, var_fxvar_coupling=False): +def _get_preprocessor_products(variables, + profile, + order, + ancestor_products, + config_user, + var_fxvar_coupling=False): """ Get preprocessor product definitions for a set of datasets. @@ -756,10 +761,9 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, ) _update_extract_shape(settings, config_user) _update_weighting_settings(settings, variable) - _update_fx_settings( - settings=settings, - variable=variable, - config_user=config_user) + _update_fx_settings(settings=settings, + variable=variable, + config_user=config_user) _update_target_grid( variable=variable, variables=variables, @@ -769,8 +773,8 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, _update_regrid_time(variable, settings) ancestors = grouped_ancestors.get(variable['filename']) fx_files_in_settings = [ - setting.get('fx_files') for setting in settings.values() if - setting.get('fx_files') is not None + setting.get('fx_files') for setting in settings.values() + if setting.get('fx_files') is not None ] if not ancestors or fx_files_in_settings: ancestors = _get_ancestors(variable, config_user) @@ -814,8 +818,7 @@ def _get_single_preprocessor_task(variables, order=order, ancestor_products=ancestor_products, config_user=config_user, - var_fxvar_coupling=var_fxvar_coupling - ) + var_fxvar_coupling=var_fxvar_coupling) if not products: raise RecipeError( @@ -889,8 +892,7 @@ def append(group_prefix, var): for variable in variables: group_prefix = variable['variable_group'] + '_derive_input_' if not variable.get('force_derivation') and _get_input_files( - variable, - config_user)[0]: + variable, config_user)[0]: # No need to derive, just process normally up to derive step var = deepcopy(variable) append(group_prefix, var) @@ -924,8 +926,8 @@ def _get_filtered_fxprofile(fxprofile): return fxprofile -def _add_fxvar_preprocessing_ancestors(step, profile, variables, - config_user, task_name): +def _add_fxvar_preprocessing_ancestors(step, profile, variables, config_user, + task_name): """Bolt on the fx vars preproc ancestors, if needed.""" fx_preproc_tasks = [] # conserve profile @@ -942,11 +944,10 @@ def _add_fxvar_preprocessing_ancestors(step, profile, variables, for fx_variable in fx_variables: # list may be (intentionally) empty - catch it here if not fx_variable: - raise RecipeError( - f"One or more of {step} fx data " - f"for {var['short_name']} are missing. " - f"Task can not be performed since there is " - f"no fx data found.") + raise RecipeError(f"One or more of {step} fx data " + f"for {var['short_name']} are missing. " + f"Task can not be performed since there is " + f"no fx data found.") before, _ = _split_settings(fx_profile, step, order) # remove time preprocessors for any fx/Ofx/Efx/etc # that dont have time coords @@ -955,13 +956,11 @@ def _add_fxvar_preprocessing_ancestors(step, profile, variables, fx_name = task_name.split( TASKSEP)[0] + TASKSEP + 'fx_area-volume_stats_' + \ fx_variable['variable_group'] - task = _get_single_preprocessor_task( - [fx_variable, var], - before, - config_user, - name=fx_name, - var_fxvar_coupling=True - ) + task = _get_single_preprocessor_task([fx_variable, var], + before, + config_user, + name=fx_name, + var_fxvar_coupling=True) fx_preproc_tasks.append(task) return fx_preproc_tasks @@ -1033,7 +1032,8 @@ def _check_duplication(task): ancestors_filename_dict[anc_product.filename] = ancestor_task filenames.append(anc_product.filename) set_ancestors = { - ancestors_filename_dict[filename] for filename in filenames + ancestors_filename_dict[filename] + for filename in filenames } task.ancestors = [ ancestor for ancestor in task.ancestors @@ -1046,9 +1046,8 @@ def _check_duplication(task): class Recipe: """Recipe object.""" - info_keys = ( - 'project', 'activity', 'dataset', 'exp', 'ensemble', 'version' - ) + info_keys = ('project', 'activity', 'dataset', 'exp', 'ensemble', + 'version') """List of keys to be used to compose the alias, ordered by priority.""" def __init__(self, raw_recipe,