From 55dd2dceaf5dda8c70d322da44918a17633d1394 Mon Sep 17 00:00:00 2001 From: Bas Harenslak Date: Sat, 30 Mar 2019 09:42:52 +0100 Subject: [PATCH] [AIRFLOW-4192] Reorganize task context variables --- UPDATING.md | 18 +++++ .../example_dags/example_qubole_sensor.py | 3 +- airflow/models/taskinstance.py | 51 +++++--------- airflow/operators/hive_to_druid.py | 4 +- docs/macros.rst | 68 ++++++++++--------- tests/cli/test_cli.py | 1 - tests/core.py | 6 -- 7 files changed, 76 insertions(+), 75 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index cd0bfc3ba2dd3..e9f63bf3b4583 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -24,6 +24,24 @@ assists users migrating to a new version. ## Airflow Master +### Some task context variables are removed +The following task context variables are removed. Some are obsolete, some are duplicates whose +value is also given via other variables, and some are "derived" values from other variables which +are also given in the task context. + +* `yesterday_ds`: can be derived from `execution_date` (`{{ (execution_date - macros.timedelta(1)).strftime('%Y-%m-%d') }}`) +* `yesterday_ds_nodash`: can be derived from `execution_date` (`{{ (execution_date - macros.timedelta(1)).strftime('%Y%m%d') }}`) +* `tomorrow_ds`: can be derived from `execution_date` (`{{ (execution_date + macros.timedelta(1)).strftime('%Y-%m-%d') }}`) +* `tomorrow_ds_nodash`: can be derived from `execution_date` (`{{ (execution_date - macros.timedelta(1)).strftime('%Y%m%d') }}`) +* `tables`: also given via params (`{{ params.get('tables', None) }}`) +* `latest_date`: same value as `ds` +* `inlets`: unused +* `outlets`: unused +* `end_date`: same value as `ds` +* `END_DATE`: same value as `ds` + +For more information, see https://github.com/apache/airflow/pull/5010. + ### `pool` config option in Celery section to support different Celery pool implementation The new `pool` config option allows users to choose different pool diff --git a/airflow/contrib/example_dags/example_qubole_sensor.py b/airflow/contrib/example_dags/example_qubole_sensor.py index 3922d3eca9005..531900cd40483 100644 --- a/airflow/contrib/example_dags/example_qubole_sensor.py +++ b/airflow/contrib/example_dags/example_qubole_sensor.py @@ -70,7 +70,8 @@ {"column": "month", "values": ["{{ ds.split('-')[1] }}"]}, {"column": "day", "values": - ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]} + ["{{ ds.split('-')[2] }}", + "{{ (execution_date - macros.timedelta(days=1)).strftime('%d') }}"]} ] # will check for partitions like [month=12/day=12,month=12/day=13] }, dag=dag diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 412abf213850b..b900fd891abbf 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1102,9 +1102,6 @@ def is_eligible_to_retry(self): def get_template_context(self, session=None): task = self.task from airflow import macros - tables = None - if 'tables' in task.params: - tables = task.params['tables'] params = {} run_id = '' @@ -1124,11 +1121,6 @@ def get_template_context(self, session=None): session.expunge_all() session.commit() - ds = self.execution_date.strftime('%Y-%m-%d') - ts = self.execution_date.isoformat() - yesterday_ds = (self.execution_date - timedelta(1)).strftime('%Y-%m-%d') - tomorrow_ds = (self.execution_date + timedelta(1)).strftime('%Y-%m-%d') - # For manually triggered dagruns that aren't run on a schedule, next/previous # schedule dates don't make sense, and should be set to execution date for # consistency with how execution_date is set for manually triggered tasks, i.e. @@ -1140,6 +1132,9 @@ def get_template_context(self, session=None): prev_execution_date = task.dag.previous_schedule(self.execution_date) next_execution_date = task.dag.following_schedule(self.execution_date) + ds = self.execution_date.strftime('%Y-%m-%d') + ds_nodash = ds.replace('-', '') + next_ds = None next_ds_nodash = None if next_execution_date: @@ -1152,11 +1147,9 @@ def get_template_context(self, session=None): prev_ds = prev_execution_date.strftime('%Y-%m-%d') prev_ds_nodash = prev_ds.replace('-', '') - ds_nodash = ds.replace('-', '') + ts = self.execution_date.isoformat() ts_nodash = self.execution_date.strftime('%Y%m%dT%H%M%S') ts_nodash_with_tz = ts.replace('-', '').replace(':', '') - yesterday_ds_nodash = yesterday_ds.replace('-', '') - tomorrow_ds_nodash = tomorrow_ds.replace('-', '') ti_key_str = "{dag_id}__{task_id}__{ds_nodash}".format( dag_id=task.dag_id, task_id=task.task_id, ds_nodash=ds_nodash) @@ -1198,46 +1191,36 @@ def __repr__(self): return str(self.var) return { + 'conf': configuration, 'dag': task.dag, + 'dag_run': dag_run, 'ds': ds, + 'ds_nodash': ds_nodash, + 'execution_date': self.execution_date, + 'macros': macros, 'next_ds': next_ds, 'next_ds_nodash': next_ds_nodash, + 'next_execution_date': next_execution_date, + 'params': params, 'prev_ds': prev_ds, 'prev_ds_nodash': prev_ds_nodash, - 'ds_nodash': ds_nodash, - 'ts': ts, - 'ts_nodash': ts_nodash, - 'ts_nodash_with_tz': ts_nodash_with_tz, - 'yesterday_ds': yesterday_ds, - 'yesterday_ds_nodash': yesterday_ds_nodash, - 'tomorrow_ds': tomorrow_ds, - 'tomorrow_ds_nodash': tomorrow_ds_nodash, - 'END_DATE': ds, - 'end_date': ds, - 'dag_run': dag_run, - 'run_id': run_id, - 'execution_date': self.execution_date, 'prev_execution_date': prev_execution_date, 'prev_execution_date_success': lazy_object_proxy.Proxy( lambda: self.previous_execution_date_success), 'prev_start_date_success': lazy_object_proxy.Proxy(lambda: self.previous_start_date_success), - 'next_execution_date': next_execution_date, - 'latest_date': ds, - 'macros': macros, - 'params': params, - 'tables': tables, + 'run_id': run_id, 'task': task, 'task_instance': self, - 'ti': self, 'task_instance_key_str': ti_key_str, - 'conf': configuration, 'test_mode': self.test_mode, + 'ti': self, + 'ts': ts, + 'ts_nodash': ts_nodash, + 'ts_nodash_with_tz': ts_nodash_with_tz, 'var': { + 'json': VariableJsonAccessor(), 'value': VariableAccessor(), - 'json': VariableJsonAccessor() }, - 'inlets': task.inlets, - 'outlets': task.outlets, } def overwrite_params_with_dag_run_conf(self, params, dag_run): diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index 7505ab95e8bb5..5992f8e5cebe6 100644 --- a/airflow/operators/hive_to_druid.py +++ b/airflow/operators/hive_to_druid.py @@ -85,7 +85,9 @@ def __init__( self.sql = sql self.druid_datasource = druid_datasource self.ts_dim = ts_dim - self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}'] + self.intervals = intervals or [ + '{{ ds }}/{{ (execution_date + macros.timedelta(days=1)).strftime("%Y-%m-%d") }}' + ] self.num_shards = num_shards self.target_partition_size = target_partition_size self.query_granularity = query_granularity diff --git a/docs/macros.rst b/docs/macros.rst index 046963553a207..98d33f571e532 100644 --- a/docs/macros.rst +++ b/docs/macros.rst @@ -26,57 +26,51 @@ Additional custom macros can be added globally through :ref:`plugins`, or at a D Default Variables ----------------- The Airflow engine passes a few variables by default that are accessible -in all templates +in all templates: ===================================== ==================================== Variable Description ===================================== ==================================== +``{{ conf }}`` the full configuration object located at + ``airflow.configuration.conf`` which + represents the content of your + ``airflow.cfg`` +``{{ dag }}`` the DAG object +``{{ dag_run }}`` a reference to the DagRun object ``{{ ds }}`` the execution date as ``YYYY-MM-DD`` ``{{ ds_nodash }}`` the execution date as ``YYYYMMDD`` -``{{ prev_ds }}`` the previous execution date as ``YYYY-MM-DD`` - if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``, - ``{{ prev_ds }}`` will be ``2018-01-01`` -``{{ prev_ds_nodash }}`` the previous execution date as ``YYYYMMDD`` if exists, else ``None`` +``{{ execution_date }}`` the execution_date (`pendulum.Pendulum`_) +``{{ macros }}`` a reference to the macros package, described below ``{{ next_ds }}`` the next execution date as ``YYYY-MM-DD`` if ``{{ ds }}`` is ``2018-01-01`` and ``schedule_interval`` is ``@weekly``, ``{{ next_ds }}`` will be ``2018-01-08`` ``{{ next_ds_nodash }}`` the next execution date as ``YYYYMMDD`` if exists, else ``None`` -``{{ yesterday_ds }}`` the day before the execution date as ``YYYY-MM-DD`` -``{{ yesterday_ds_nodash }}`` the day before the execution date as ``YYYYMMDD`` -``{{ tomorrow_ds }}`` the day after the execution date as ``YYYY-MM-DD`` -``{{ tomorrow_ds_nodash }}`` the day after the execution date as ``YYYYMMDD`` -``{{ ts }}`` same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00`` -``{{ ts_nodash }}`` same as ``ts`` without ``-``, ``:`` and TimeZone info. Example: ``20180101T000000`` -``{{ ts_nodash_with_tz }}`` same as ``ts`` without ``-`` and ``:``. Example: ``20180101T000000+0000`` -``{{ execution_date }}`` the execution_date (`pendulum.Pendulum`_) +``{{ next_execution_date }}`` the next execution date (`pendulum.Pendulum`_) +``{{ params }}`` a reference to the user-defined params dictionary which can be overridden by + the dictionary passed through ``trigger_dag -c`` if you enabled + ``dag_run_conf_overrides_params` in ``airflow.cfg`` +``{{ prev_ds }}`` the previous execution date as ``YYYY-MM-DD`` + if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``, + ``{{ prev_ds }}`` will be ``2018-01-01`` +``{{ prev_ds_nodash }}`` the previous execution date as ``YYYYMMDD`` if exists, else ``None`` ``{{ prev_execution_date }}`` the previous execution date (if available) (`pendulum.Pendulum`_) ``{{ prev_execution_date_success }}`` execution date from prior succesful dag run (if available) (`pendulum.Pendulum`_) ``{{ prev_start_date_success }}`` start date from prior successful dag run (if available) (`pendulum.Pendulum`_) -``{{ next_execution_date }}`` the next execution date (`pendulum.Pendulum`_) -``{{ dag }}`` the DAG object +``{{ run_id }}`` the ``run_id`` of the current DAG run ``{{ task }}`` the Task object -``{{ macros }}`` a reference to the macros package, described below ``{{ task_instance }}`` the task_instance object -``{{ end_date }}`` same as ``{{ ds }}`` -``{{ latest_date }}`` same as ``{{ ds }}`` -``{{ ti }}`` same as ``{{ task_instance }}`` -``{{ params }}`` a reference to the user-defined params dictionary which can be overridden by - the dictionary passed through ``trigger_dag -c`` if you enabled - ``dag_run_conf_overrides_params` in ``airflow.cfg`` -``{{ var.value.my_var }}`` global defined variables represented as a dictionary -``{{ var.json.my_var.path }}`` global defined variables represented as a dictionary - with deserialized JSON object, append the path to the - key within the JSON object ``{{ task_instance_key_str }}`` a unique, human-readable key to the task instance formatted ``{dag_id}_{task_id}_{ds}`` -``{{ conf }}`` the full configuration object located at - ``airflow.configuration.conf`` which - represents the content of your - ``airflow.cfg`` -``{{ run_id }}`` the ``run_id`` of the current DAG run -``{{ dag_run }}`` a reference to the DagRun object ``{{ test_mode }}`` whether the task instance was called using the CLI's test subcommand +``{{ ti }}`` same as ``{{ task_instance }}`` +``{{ ts }}`` same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00`` +``{{ ts_nodash }}`` same as ``ts`` without ``-``, ``:`` and TimeZone info. Example: ``20180101T000000`` +``{{ ts_nodash_with_tz }}`` same as ``ts`` without ``-`` and ``:``. Example: ``20180101T000000+0000`` +``{{ var.json.my_var.path }}`` global defined variables represented as a dictionary + with deserialized JSON object, append the path to the + key within the JSON object +``{{ var.value.my_var }}`` global defined variables represented as a dictionary ===================================== ==================================== Note that you can access the object's attributes and methods with simple @@ -90,6 +84,16 @@ UI. You can access them as either plain-text or JSON. If you use JSON, you are also able to walk nested structures, such as dictionaries like: ``{{ var.json.my_dict_var.key1 }}`` +From the ``execution_date`` you can derive other dates. For example, say you need +the date one day before the execution date, formatted as ``YYYY-MM-DD``: + +.. code-block:: python + + {{ (execution_date - macros.timedelta(1)).strftime('%Y-%m-%d') }} + +The ``macros`` package is available in the context variables and contains a number +of useful other packages to process the context variables. + Macros ------ Macros are a way to expose objects to your templates and live under the diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 244cc2dbacabe..33f9bcab5401f 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -206,7 +206,6 @@ def test_test(self): output = out.getvalue() # Check that prints, and log messages, are shown - self.assertIn('END_DATE', output) self.assertIn("'example_python_operator__print_the_context__20180101'", output) finally: sys.stdout = saved_stdout diff --git a/tests/core.py b/tests/core.py index 0637d3cbc16ce..0f2db09001cd1 100644 --- a/tests/core.py +++ b/tests/core.py @@ -689,12 +689,6 @@ def test_task_get_template(self): self.assertEqual(context['ts_nodash'], '20150101T000000') self.assertEqual(context['ts_nodash_with_tz'], '20150101T000000+0000') - self.assertEqual(context['yesterday_ds'], '2014-12-31') - self.assertEqual(context['yesterday_ds_nodash'], '20141231') - - self.assertEqual(context['tomorrow_ds'], '2015-01-02') - self.assertEqual(context['tomorrow_ds_nodash'], '20150102') - def test_import_examples(self): self.assertEqual(len(self.dagbag.dags), NUM_EXAMPLE_DAGS)