Skip to content
161 changes: 133 additions & 28 deletions doc/esmvalcore/recipe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ arguments):
operator: mean
multi_model_statistics:
span: overlap
statistics: [mean ]
statistics: [mean]

.. note::

Expand All @@ -161,20 +161,138 @@ arguments):
Recipe section: ``diagnostics``
===============================

The diagnostics section includes one or more diagnostics. Each diagnostics will
include:
The diagnostics section includes one or more diagnostics. Each diagnostic
section will include:

- a list of which variables to load;
- a description of the variables (optional);
- the preprocessor to be applied to each variable;
- the script to be run;
- the variable(s) to preprocess, including the preprocessor to be applied to each variable;
- the diagnostic script(s) to be run;
- a description of the diagnostic and lists of themes and realms that it applies to;
- an optional ``additional_datasets`` section.

The ``additional_datasets`` can add datasets beyond those listed in the the
Datasets_ section. This is useful if specific datasets need to be used only by
a specific diagnostic. The ``additional_datasets`` can also be used to add
variable specific datasets. This is also a good way to add observational
datasets, which are usually variable-specific.
The diagnostics section defines tasks
-------------------------------------
The diagnostic section(s) define the tasks that will be executed when running the recipe.
For each variable a preprocesing task will be defined and for each diagnostic script a
diagnostic task will be defined. If variables need to be derived
from other variables, a preprocessing task for each of the variables
needed to derive that variable will be defined as well. These tasks can be viewed
in the main_log_debug.txt file that is produced every run. Each task has a unique
name that defines the subdirectory where the results of that task are stored. Task
names start with the name of the diagnostic section followed by a '/' and then
the name of the variable section for a preprocessing task or the name of the diagnostic
script section for a diagnostic task.

A (simplified) example diagnostics section could look like

.. code-block:: yaml

diagnostics:
diagnostic_name:
description: Air temperature tutorial diagnostic.
themes:
- phys
realms:
- atmos
variables:
variable_name:
short_name: ta
preprocessor: preprocessor_name
mip: Amon
scripts:
script_name:
script: examples/diagnostic.py


Note that the example recipe above contains a single diagnostic section
called ``diagnostic_name`` and will result in two tasks:

- a preprocessing task called ``diagnostic_name/variable_name`` that will preprocess
air temperature data for each dataset in the Datasets_ section of the recipe (not shown).
- a diagnostic task called ``diagnostic_name/script_name``

The path to the script provided in the ``script`` option should be
either the absolute path to the script, or the path relative to the
``esmvaltool/diag_scripts`` directory.

Ancestor tasks
--------------
Some tasks require the result of other tasks to be ready before they can start,
e.g. a diagnostic script needs the preprocessed variable data to start. Thus
each tasks has zero or more ancestor tasks. By default, each diagnostic task
in a diagnostic section has all variable preprocessing tasks in that same section
as ancestors. However, this can be changed using the ``ancestors`` keyword. Note
that wildcard expansion can be used to define ancestors.

.. code-block:: yaml

diagnostics:
diagnostic_1:
variables:
airtemp:
short_name: ta
preprocessor: preprocessor_name
mip: Amon
scripts:
script_a:
script: diagnostic_a.py
diagnostic_2:
variables:
precip:
short_name: pr
preprocessor: preprocessor_name
mip: Amon
scripts:
script_b:
script: diagnostic_b.py
ancestors: [diagnostic_1/script_a, precip]


The example recipe above will result in four tasks:

- a preprocessing task called ``diagnostic_1/airtemp``
- a diagnostic task called ``diagnostic_1/script_a``
- a preprocessing task called ``diagnostic_2/precip``
- a diagnostic task called ``diagnostic_2/script_b``

the preprocessing tasks do not have any ancestors, while the diagnostic_a.py
script will receive the preprocessed air temperature data
(has ancestor ``diagnostic_1/airtemp``) and the diagnostic_b.py
script will receive the results of diagnostic_a.py and the preprocessed precipitation
data (has ancestors ``diagnostic_1/script_a`` and ``diagnostic_2/precip``).

Task priority
-------------
Tasks are assigned a priority, with tasks appearing earlier on in the recipe
getting higher priority. The tasks will be executed sequentially or in parellel,
depending on the setting of ``max_parallel_tasks`` in the :ref:`user configuration file`.
When there are fewer than ``max_parallel_tasks`` running, tasks will be started
according to their priority. For obvious reasons, only tasks that are not waiting for
ancestor tasks can be started. This feature makes it possible to
reduce the processing time of recipes with many tasks, by placing tasks that
take relatively long near the top of the recipe. Of course this only works when
settings ``max_parallel_tasks`` to a value larger than 1. The current priority
and run time of individual tasks can be seen in the log messages shown when
running the tool (a lower number means higher priority).

Variable and dataset definitions
--------------------------------
To define a variable/dataset combination that corresponds to an actual
variable from a dataset, the keys in each variable section
are combined with the keys of each dataset definition. If two versions of the same
key are provided, then the key in the datasets section will take precedence
over the keys in variables section. For many recipes it makes more sense to
define the ``start_year`` and ``end_year`` items in the variable section,
because the diagnostic script assumes that all the data has the same time
range.

Diagnostic and variable specific datasets
-----------------------------------------
The ``additional_datasets`` option can be used to add datasets beyond those
listed in the Datasets_ section. This is useful if specific datasets need to
be used only by a specific diagnostic or variable, i.e. it can be added both
at diagnostic level, where it will apply to all variables in that diagnostic
section or at individual variable level. For example, this can be a good way
to add observational datasets, which are usually variable-specific.

Running a simple diagnostic
---------------------------
Expand All @@ -199,22 +317,9 @@ map scipt, ``ocean/diagnostic_maps.py``.
Global_Ocean_Surface_regrid_map:
script: ocean/diagnostic_maps.py

To define a variable/dataset combination, the keys in the diagnostic section
are combined with the keys from datasets section. If two versions of the same
key are provided, then the key in the datasets section will take precedence
over the keys in variables section. For many recipes it makes more sense to
define the ``start_year`` and ``end_year`` items in the variable section,
because the diagnostic script assumes that all the data has the same time
range.

Note that the path to the script provided in the `script` option should be
either the absolute path to the script, or the path relative to the
``esmvaltool/diag_scripts`` directory.


Passing arguments to a diagnostic
---------------------------------
The ``diagnostics`` section may include a lot of arguments that can be used by
Passing arguments to a diagnostic script
----------------------------------------
The diagnostic script section(s) may include custom arguments that can be used by
the diagnostic script; these arguments are stored at runtime in a dictionary
that is then made available to the diagnostic script via the interface link,
independent of the language the diagnostic script is written in. Here is an
Expand Down
6 changes: 6 additions & 0 deletions esmvalcore/_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,7 @@ def initialize_tasks(self):
logger.info("Creating tasks from recipe")
tasks = set()

priority = 0
for diagnostic_name, diagnostic in self.diagnostics.items():
logger.info("Creating tasks for diagnostic %s", diagnostic_name)

Expand All @@ -1169,7 +1170,10 @@ def initialize_tasks(self):
config_user=self._cfg,
task_name=task_name,
)
for task0 in task.flatten():
task0.priority = priority
tasks.add(task)
priority += 1

# Create diagnostic tasks
for script_name, script_cfg in diagnostic['scripts'].items():
Expand All @@ -1181,7 +1185,9 @@ def initialize_tasks(self):
settings=script_cfg['settings'],
name=task_name,
)
task.priority = priority
tasks.add(task)
priority += 1

check.tasks_valid(tasks)

Expand Down
89 changes: 48 additions & 41 deletions esmvalcore/_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import threading
import time
from copy import deepcopy
from multiprocessing import Pool, cpu_count
from multiprocessing import Pool

import psutil
import yaml

from ._config import TAGS, replace_tags, DIAGNOSTICS_PATH
from ._config import DIAGNOSTICS_PATH, TAGS, replace_tags
from ._provenance import TrackedFile, get_task_provenance

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -193,12 +193,14 @@ def _ncl_type(value):
class BaseTask:
"""Base class for defining task classes."""

def __init__(self, ancestors=None, name=''):
def __init__(self, ancestors=None, name='', products=None):
"""Initialize task."""
self.ancestors = [] if ancestors is None else ancestors
self.products = set() if products is None else set(products)
self.output_files = None
self.name = name
self.activity = None
self.priority = 0

def initialize_provenance(self, recipe_entity):
"""Initialize task provenance activity."""
Expand All @@ -224,8 +226,11 @@ def run(self, input_files=None):
input_files.extend(task.run())
logger.info("Starting task %s in process [%s]", self.name,
os.getpid())
start = datetime.datetime.now()
self.output_files = self._run(input_files)
logger.info("Successfully completed task %s", self.name)
runtime = datetime.datetime.now() - start
logger.info("Successfully completed task %s (priority %s) in %s",
self.name, self.priority, runtime)

return self.output_files

Expand Down Expand Up @@ -254,10 +259,9 @@ class DiagnosticTask(BaseTask):

def __init__(self, script, settings, output_dir, ancestors=None, name=''):
"""Create a diagnostic task."""
super(DiagnosticTask, self).__init__(ancestors=ancestors, name=name)
super().__init__(ancestors=ancestors, name=name)
self.script = script
self.settings = settings
self.products = set()
self.output_dir = output_dir
self.cmd = self._initialize_cmd(script)
self.log = os.path.join(settings['run_dir'], 'log.txt')
Expand All @@ -266,8 +270,7 @@ def __init__(self, script, settings, output_dir, ancestors=None, name=''):

def _initialize_cmd(self, script):
"""Create a an executable command from script."""
diagnostics_root = os.path.join(
DIAGNOSTICS_PATH, 'diag_scripts')
diagnostics_root = os.path.join(DIAGNOSTICS_PATH, 'diag_scripts')
script_file = os.path.abspath(os.path.join(diagnostics_root, script))

if not os.path.isfile(script_file):
Expand Down Expand Up @@ -456,8 +459,8 @@ def _run(self, input_files):
env['MPLBACKEND'] = 'Agg'
else:
# Make diag_scripts path available to diagostics scripts
env['diag_scripts'] = os.path.join(
DIAGNOSTICS_PATH, 'diag_scripts')
env['diag_scripts'] = os.path.join(DIAGNOSTICS_PATH,
'diag_scripts')

cmd = list(self.cmd)
settings_file = self.write_settings()
Expand Down Expand Up @@ -601,53 +604,46 @@ def _run_tasks_sequential(tasks):
n_tasks = len(get_flattened_tasks(tasks))
logger.info("Running %s tasks sequentially", n_tasks)

for task in get_independent_tasks(tasks):
tasks = get_independent_tasks(tasks)
for task in sorted(tasks, key=lambda t: t.priority):
task.run()


def _run_tasks_parallel(tasks, max_parallel_tasks=None):
"""Run tasks in parallel."""
scheduled = get_flattened_tasks(tasks)
running = []
results = []
running = {}

n_scheduled, n_running = len(scheduled), len(running)
n_tasks = n_scheduled

pool = Pool(processes=max_parallel_tasks)

logger.info("Running %s tasks using at most %s processes", n_tasks,
max_parallel_tasks or cpu_count())
if max_parallel_tasks is None:
max_parallel_tasks = os.cpu_count()
if max_parallel_tasks > n_tasks:
max_parallel_tasks = n_tasks
logger.info("Running %s tasks using %s processes", n_tasks,
max_parallel_tasks)

def done(task):
"""Assume a task is done if it not scheduled or running."""
return not (task in scheduled or task in running)

pool = Pool(processes=max_parallel_tasks)
while scheduled or running:
# Submit new tasks to pool
just_scheduled = []
for task in scheduled:
if not task.ancestors or all(done(t) for t in task.ancestors):
result = pool.apply_async(_run_task, [task])
results.append(result)
running.append(task)
just_scheduled.append(task)
for task in just_scheduled:
scheduled.remove(task)
for task in sorted(scheduled, key=lambda t: t.priority):
if len(running) >= max_parallel_tasks:
break
if all(done(t) for t in task.ancestors):
future = pool.apply_async(_run_task, [task])
running[task] = future
scheduled.remove(task)

# Handle completed tasks
for task, result in zip(running, results):
if result.ready():
task.output_files, updated_products = result.get()
for updated in updated_products:
for original in task.products:
if original.filename == updated.filename:
updated.copy_provenance(target=original)
break
else:
task.products.add(updated)
running.remove(task)
results.remove(result)
ready = {t for t in running if running[t].ready()}
for task in ready:
_copy_results(task, running[task])
running.pop(task)

# Wait if there are still tasks running
if running:
Expand All @@ -658,14 +654,25 @@ def done(task):
n_scheduled, n_running = len(scheduled), len(running)
n_done = n_tasks - n_scheduled - n_running
logger.info(
"Progress: %s tasks running or queued, %s tasks waiting for "
"ancestors, %s/%s done", n_running, n_scheduled, n_done,
n_tasks)
"Progress: %s tasks running, %s tasks waiting for ancestors, "
"%s/%s done", n_running, n_scheduled, n_done, n_tasks)

pool.close()
pool.join()


def _copy_results(task, future):
"""Update task with the results from the remote process."""
task.output_files, updated_products = future.get()
for updated in updated_products:
for original in task.products:
if original.filename == updated.filename:
updated.copy_provenance(target=original)
break
else:
task.products.add(updated)


def _run_task(task):
"""Run task and return the result."""
output_files = task.run()
Expand Down
Loading