Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 116 additions & 117 deletions esmvalcore/_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ def _update_target_levels(variable, variables, settings, config_user):
short_name=variable_data['short_name'],
mip=variable_data['mip'],
frequency=variable_data['frequency'],
fix_dir=os.path.splitext(
variable_data['filename'])[0] + '_fixed',
fix_dir=os.path.splitext(variable_data['filename'])[0] +
'_fixed',
)


Expand Down Expand Up @@ -232,8 +232,8 @@ def _dataset_to_file(variable, config_user):
for required_var in required_vars:
_augment(required_var, variable)
_add_cmor_info(required_var, override=True)
(files, dirnames, filenames) = _get_input_files(required_var,
config_user)
(files, dirnames,
filenames) = _get_input_files(required_var, config_user)
if files:
variable = required_var
break
Expand Down Expand Up @@ -434,19 +434,6 @@ def _get_correct_fx_file(variable, fx_variable, config_user):
return fx_files, valid_fx_vars


def _get_landsea_fraction_fx_dict(variable, config_user):
"""Get dict of available ``sftlf`` and ``sftof`` variables."""
fx_dict = {}
fx_vars = ['sftlf']
if variable['project'] != 'obs4mips':
fx_vars.append('sftof')
for fx_var in fx_vars:
fx_file, _ = _get_correct_fx_file(variable, fx_var, config_user)
fx_dict[fx_var] = fx_file

return fx_dict


def _exclude_dataset(settings, variable, step):
"""Exclude dataset from specific preprocessor step if requested."""
exclude = {
Expand All @@ -466,73 +453,87 @@ def _update_weighting_settings(settings, variable):
_exclude_dataset(settings, variable, 'weighting_landsea_fraction')


def _get_fx_mask_settings(variable, settings, config_user):
"""Assemble the fx var structures for land/sea masking."""
msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s"

def _update_landsea(operation, variable, config_user):
"""Update settings for either landsea or weighted landsea."""
logger.debug('Getting {operation} fx mask settings now...')
fx_dict = _get_landsea_fraction_fx_dict(variable, config_user)
fx_list = [fx_file for fx_file in fx_dict.values() if fx_file]
if operation == 'mask_landsea':
settings[operation]['fx_files'] = fx_list
elif operation == 'weighting_landsea_fraction':
settings[operation]['fx_files'] = fx_dict
logger.info(msg, '{operation} masking', pformat(fx_dict))

landsea_ops = ['mask_landsea', 'weighting_landsea_fraction']
for landsea_op in landsea_ops:
if landsea_op in settings:
_update_landsea(landsea_op, variable, config_user)

if 'mask_landseaice' in settings:
logger.debug('Getting land-seaice fx mask settings now...')
settings['mask_landseaice']['fx_files'] = []
fx_file, _ = _get_correct_fx_file(variable, 'sftgif', config_user)
fx_files_dict = {'sftgif': fx_file}
if fx_files_dict['sftgif']:
settings['mask_landseaice']['fx_files'].append(
fx_files_dict['sftgif'])
logger.info(msg, 'land/sea ice masking', pformat(fx_files_dict))


def _get_fx_stats_settings(variable, settings, config_user):
"""Assemble the fx vars structures for area/volume stats."""
msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s"
for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'):
var = dict(variable)
fx_files_dict = {}
var['fx_files'] = settings.get(step, {}).get('fx_files')
if var['fx_files']:
for fxvar in var['fx_files']:
fxvar_name = fxvar
if isinstance(fxvar, dict):
try:
fxvar_name = fxvar["short_name"]
except KeyError:
raise RecipeError(
"No entry short_name found in {}".format(fxvar)
)
_, cmor_fx_var = _get_correct_fx_file(
variable, fxvar, config_user)
if cmor_fx_var:
fx_files_dict[fxvar_name] = get_output_file(
var,
config_user['preproc_dir'],
fx_var_alias=cmor_fx_var)

# finally construct the fx files dictionary
settings[step]['fx_files'] = fx_files_dict
logger.info(msg, step, pformat(fx_files_dict))
def _update_fx_settings_mask(settings, variable, config_user, fx_vars,
as_list):
"""Update settings with mask fx file list or dict."""
logger.debug('Getting land-sea fx mask settings now...')
fx_dict = {
fx_var: _get_correct_fx_file(variable, fx_var, config_user)[0]
for fx_var in fx_vars
}
if as_list:
fx_list = [
fx_file for fx_file in fx_dict.values() if fx_file
]
settings['fx_files'] = fx_list
else:
settings['fx_files'] = fx_dict
logger.info(pformat(settings['fx_files']))


def _update_fx_settings_stats(settings, variable, config_user, fx_vars):
"""Update settings with spatial analysis fx vars files dict."""
if not fx_vars:
return
fx_vars = [
_get_correct_fx_file(variable, fxvar, config_user)[1]
for fxvar in fx_vars
]
fx_dict = {
fx_var['short_name']: get_output_file(variable,
config_user['preproc_dir'],
fx_var_alias=fx_var)
for fx_var in fx_vars if fx_var
}
settings['fx_files'] = fx_dict
logger.info(pformat(settings['fx_files']))


def _update_fx_settings(settings, variable, config_user):
"""Find and set the general FX mask/stats settings."""
# get settings for fixed fx masks: land, sea or seaice or weighted
_get_fx_mask_settings(variable, settings, config_user)
# get settings for fx variables that need preprocessing
_get_fx_stats_settings(variable, settings, config_user)
"""Update fx settings depending on the needed method."""
msg = f"Using fx files for %s of dataset %s:\n%s"
# landsea mask fx variables
landseamask_vars = ['sftlf']
if variable['project'] != 'obs4mips':
landseamask_vars.append('sftof')

# spatial stats fx variables
def _get_spatial_stats_fx_vars(settings, step_name):
spatial_stats_fx_vars = settings.get(step_name, {}).get('fx_files')
return spatial_stats_fx_vars

update_methods = {
'mask_landsea': (_update_fx_settings_mask, {
'as_list': True,
'fx_vars': landseamask_vars
}),
'mask_landseaice': (_update_fx_settings_mask, {
'as_list': True,
'fx_vars': ['sftgif']
}),
'weighting_landsea_fraction': (_update_fx_settings_mask, {
'as_list': False,
'fx_vars': landseamask_vars
}),
'area_statistics': (_update_fx_settings_stats, {
'fx_vars':
_get_spatial_stats_fx_vars(settings, 'area_statistics')
}),
'volume_statistics': (_update_fx_settings_stats, {
'fx_vars':
_get_spatial_stats_fx_vars(settings, 'volume_statistics')
}),
'zonal_statistics': (_update_fx_settings_stats, {
'fx_vars':
_get_spatial_stats_fx_vars(settings, 'zonal_statistics')
}),
}
for step_name, step_settings in settings.items():
update_method, kwargs = update_methods.get(step_name, (None, {}))
if update_method:
update_method(step_settings, variable, config_user, **kwargs)
logger.info(msg, variable['short_name'],
variable['dataset'], step_name)


def _read_attributes(filename):
Expand All @@ -550,10 +551,10 @@ def _read_attributes(filename):

def _get_input_files(variable, config_user):
"""Get the input files for a single dataset (locally and via download)."""
(input_files, dirnames, filenames) = get_input_filelist(
variable=variable,
rootpath=config_user['rootpath'],
drs=config_user['drs'])
(input_files, dirnames,
filenames) = get_input_filelist(variable=variable,
rootpath=config_user['rootpath'],
drs=config_user['drs'])

# Set up downloading using synda if requested.
# Do not download if files are already available locally.
Expand All @@ -567,8 +568,8 @@ def _get_input_files(variable, config_user):

def _get_ancestors(variable, config_user):
"""Get the input files for a single dataset and setup provenance."""
(input_files, dirnames, filenames) = _get_input_files(variable,
config_user)
(input_files, dirnames,
filenames) = _get_input_files(variable, config_user)

logger.info("Using input files for variable %s of dataset %s:\n%s",
variable['short_name'], variable['dataset'],
Expand Down Expand Up @@ -713,8 +714,12 @@ def get_matching(attributes):
return grouped_products


def _get_preprocessor_products(variables, profile, order, ancestor_products,
config_user, var_fxvar_coupling=False):
def _get_preprocessor_products(variables,
profile,
order,
ancestor_products,
config_user,
var_fxvar_coupling=False):
"""
Get preprocessor product definitions for a set of datasets.

Expand Down Expand Up @@ -756,10 +761,9 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products,
)
_update_extract_shape(settings, config_user)
_update_weighting_settings(settings, variable)
_update_fx_settings(
settings=settings,
variable=variable,
config_user=config_user)
_update_fx_settings(settings=settings,
variable=variable,
config_user=config_user)
_update_target_grid(
variable=variable,
variables=variables,
Expand All @@ -769,8 +773,8 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products,
_update_regrid_time(variable, settings)
ancestors = grouped_ancestors.get(variable['filename'])
fx_files_in_settings = [
setting.get('fx_files') for setting in settings.values() if
setting.get('fx_files') is not None
setting.get('fx_files') for setting in settings.values()
if setting.get('fx_files') is not None
]
if not ancestors or fx_files_in_settings:
ancestors = _get_ancestors(variable, config_user)
Expand Down Expand Up @@ -814,8 +818,7 @@ def _get_single_preprocessor_task(variables,
order=order,
ancestor_products=ancestor_products,
config_user=config_user,
var_fxvar_coupling=var_fxvar_coupling
)
var_fxvar_coupling=var_fxvar_coupling)

if not products:
raise RecipeError(
Expand Down Expand Up @@ -889,8 +892,7 @@ def append(group_prefix, var):
for variable in variables:
group_prefix = variable['variable_group'] + '_derive_input_'
if not variable.get('force_derivation') and _get_input_files(
variable,
config_user)[0]:
variable, config_user)[0]:
# No need to derive, just process normally up to derive step
var = deepcopy(variable)
append(group_prefix, var)
Expand Down Expand Up @@ -924,8 +926,8 @@ def _get_filtered_fxprofile(fxprofile):
return fxprofile


def _add_fxvar_preprocessing_ancestors(step, profile, variables,
config_user, task_name):
def _add_fxvar_preprocessing_ancestors(step, profile, variables, config_user,
task_name):
"""Bolt on the fx vars preproc ancestors, if needed."""
fx_preproc_tasks = []
# conserve profile
Expand All @@ -942,11 +944,10 @@ def _add_fxvar_preprocessing_ancestors(step, profile, variables,
for fx_variable in fx_variables:
# list may be (intentionally) empty - catch it here
if not fx_variable:
raise RecipeError(
f"One or more of {step} fx data "
f"for {var['short_name']} are missing. "
f"Task can not be performed since there is "
f"no fx data found.")
raise RecipeError(f"One or more of {step} fx data "
f"for {var['short_name']} are missing. "
f"Task can not be performed since there is "
f"no fx data found.")
before, _ = _split_settings(fx_profile, step, order)
# remove time preprocessors for any fx/Ofx/Efx/etc
# that dont have time coords
Expand All @@ -955,13 +956,11 @@ def _add_fxvar_preprocessing_ancestors(step, profile, variables,
fx_name = task_name.split(
TASKSEP)[0] + TASKSEP + 'fx_area-volume_stats_' + \
fx_variable['variable_group']
task = _get_single_preprocessor_task(
[fx_variable, var],
before,
config_user,
name=fx_name,
var_fxvar_coupling=True
)
task = _get_single_preprocessor_task([fx_variable, var],
before,
config_user,
name=fx_name,
var_fxvar_coupling=True)
fx_preproc_tasks.append(task)

return fx_preproc_tasks
Expand Down Expand Up @@ -1033,7 +1032,8 @@ def _check_duplication(task):
ancestors_filename_dict[anc_product.filename] = ancestor_task
filenames.append(anc_product.filename)
set_ancestors = {
ancestors_filename_dict[filename] for filename in filenames
ancestors_filename_dict[filename]
for filename in filenames
}
task.ancestors = [
ancestor for ancestor in task.ancestors
Expand All @@ -1046,9 +1046,8 @@ def _check_duplication(task):
class Recipe:
"""Recipe object."""

info_keys = (
'project', 'activity', 'dataset', 'exp', 'ensemble', 'version'
)
info_keys = ('project', 'activity', 'dataset', 'exp', 'ensemble',
'version')
"""List of keys to be used to compose the alias, ordered by priority."""
def __init__(self,
raw_recipe,
Expand Down