Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ assists users migrating to a new version.
## Airflow Master


### Some task context variables are removed
The following task context variables are removed. Some are obsolete, some are duplicates whose
value is also given via other variables, and some are "derived" values from other variables which
are also given in the task context.

* `yesterday_ds`: can be derived from `execution_date` (`{{ (execution_date - macros.timedelta(1)).strftime('%Y-%m-%d') }}`)
* `yesterday_ds_nodash`: can be derived from `execution_date` (`{{ (execution_date - macros.timedelta(1)).strftime('%Y%m%d') }}`)
* `tomorrow_ds`: can be derived from `execution_date` (`{{ (execution_date + macros.timedelta(1)).strftime('%Y-%m-%d') }}`)
* `tomorrow_ds_nodash`: can be derived from `execution_date` (`{{ (execution_date - macros.timedelta(1)).strftime('%Y%m%d') }}`)
* `tables`: also given via params (`{{ params.get('tables', None) }}`)
* `latest_date`: same value as `ds`
* `inlets`: unused
* `outlets`: unused
* `end_date`: same value as `ds`
* `END_DATE`: same value as `ds`

For more information, see https://github.com/apache/airflow/pull/5010.

### `pool` config option in Celery section to support different Celery pool implementation

The new `pool` config option allows users to choose different pool
Expand Down
3 changes: 2 additions & 1 deletion airflow/contrib/example_dags/example_qubole_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
{"column": "month", "values":
["{{ ds.split('-')[1] }}"]},
{"column": "day", "values":
["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]}
["{{ ds.split('-')[2] }}",
"{{ (execution_date - macros.timedelta(days=1)).strftime('%d') }}"]}
] # will check for partitions like [month=12/day=12,month=12/day=13]
},
dag=dag
Expand Down
51 changes: 17 additions & 34 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1102,9 +1102,6 @@ def is_eligible_to_retry(self):
def get_template_context(self, session=None):
task = self.task
from airflow import macros
tables = None
if 'tables' in task.params:
tables = task.params['tables']

params = {}
run_id = ''
Expand All @@ -1124,11 +1121,6 @@ def get_template_context(self, session=None):
session.expunge_all()
session.commit()

ds = self.execution_date.strftime('%Y-%m-%d')
ts = self.execution_date.isoformat()
yesterday_ds = (self.execution_date - timedelta(1)).strftime('%Y-%m-%d')
tomorrow_ds = (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')

# For manually triggered dagruns that aren't run on a schedule, next/previous
# schedule dates don't make sense, and should be set to execution date for
# consistency with how execution_date is set for manually triggered tasks, i.e.
Expand All @@ -1140,6 +1132,9 @@ def get_template_context(self, session=None):
prev_execution_date = task.dag.previous_schedule(self.execution_date)
next_execution_date = task.dag.following_schedule(self.execution_date)

ds = self.execution_date.strftime('%Y-%m-%d')
ds_nodash = ds.replace('-', '')

next_ds = None
next_ds_nodash = None
if next_execution_date:
Expand All @@ -1152,11 +1147,9 @@ def get_template_context(self, session=None):
prev_ds = prev_execution_date.strftime('%Y-%m-%d')
prev_ds_nodash = prev_ds.replace('-', '')

ds_nodash = ds.replace('-', '')
ts = self.execution_date.isoformat()
ts_nodash = self.execution_date.strftime('%Y%m%dT%H%M%S')
ts_nodash_with_tz = ts.replace('-', '').replace(':', '')
yesterday_ds_nodash = yesterday_ds.replace('-', '')
tomorrow_ds_nodash = tomorrow_ds.replace('-', '')

ti_key_str = "{dag_id}__{task_id}__{ds_nodash}".format(
dag_id=task.dag_id, task_id=task.task_id, ds_nodash=ds_nodash)
Expand Down Expand Up @@ -1198,46 +1191,36 @@ def __repr__(self):
return str(self.var)

return {
'conf': configuration,
'dag': task.dag,
'dag_run': dag_run,
'ds': ds,
'ds_nodash': ds_nodash,
'execution_date': self.execution_date,
'macros': macros,
'next_ds': next_ds,
'next_ds_nodash': next_ds_nodash,
'next_execution_date': next_execution_date,
'params': params,
'prev_ds': prev_ds,
'prev_ds_nodash': prev_ds_nodash,
'ds_nodash': ds_nodash,
'ts': ts,
'ts_nodash': ts_nodash,
'ts_nodash_with_tz': ts_nodash_with_tz,
'yesterday_ds': yesterday_ds,
'yesterday_ds_nodash': yesterday_ds_nodash,
'tomorrow_ds': tomorrow_ds,
'tomorrow_ds_nodash': tomorrow_ds_nodash,
'END_DATE': ds,
'end_date': ds,
'dag_run': dag_run,
'run_id': run_id,
'execution_date': self.execution_date,
'prev_execution_date': prev_execution_date,
'prev_execution_date_success': lazy_object_proxy.Proxy(
lambda: self.previous_execution_date_success),
'prev_start_date_success': lazy_object_proxy.Proxy(lambda: self.previous_start_date_success),
'next_execution_date': next_execution_date,
'latest_date': ds,
'macros': macros,
'params': params,
'tables': tables,
'run_id': run_id,
'task': task,
'task_instance': self,
'ti': self,
'task_instance_key_str': ti_key_str,
'conf': configuration,
'test_mode': self.test_mode,
'ti': self,
'ts': ts,
'ts_nodash': ts_nodash,
'ts_nodash_with_tz': ts_nodash_with_tz,
'var': {
'json': VariableJsonAccessor(),
'value': VariableAccessor(),
'json': VariableJsonAccessor()
},
'inlets': task.inlets,
'outlets': task.outlets,
}

def overwrite_params_with_dag_run_conf(self, params, dag_run):
Expand Down
4 changes: 3 additions & 1 deletion airflow/operators/hive_to_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ def __init__(
self.sql = sql
self.druid_datasource = druid_datasource
self.ts_dim = ts_dim
self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}']
self.intervals = intervals or [
'{{ ds }}/{{ (execution_date + macros.timedelta(days=1)).strftime("%Y-%m-%d") }}'
]
self.num_shards = num_shards
self.target_partition_size = target_partition_size
self.query_granularity = query_granularity
Expand Down
68 changes: 36 additions & 32 deletions docs/macros.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,57 +26,51 @@ Additional custom macros can be added globally through :ref:`plugins`, or at a D
Default Variables
-----------------
The Airflow engine passes a few variables by default that are accessible
in all templates
in all templates:

===================================== ====================================
Variable Description
===================================== ====================================
``{{ conf }}`` the full configuration object located at
``airflow.configuration.conf`` which
represents the content of your
``airflow.cfg``
``{{ dag }}`` the DAG object
``{{ dag_run }}`` a reference to the DagRun object
``{{ ds }}`` the execution date as ``YYYY-MM-DD``
``{{ ds_nodash }}`` the execution date as ``YYYYMMDD``
``{{ prev_ds }}`` the previous execution date as ``YYYY-MM-DD``
if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
``{{ prev_ds }}`` will be ``2018-01-01``
``{{ prev_ds_nodash }}`` the previous execution date as ``YYYYMMDD`` if exists, else ``None``
``{{ execution_date }}`` the execution_date (`pendulum.Pendulum`_)
``{{ macros }}`` a reference to the macros package, described below
``{{ next_ds }}`` the next execution date as ``YYYY-MM-DD``
if ``{{ ds }}`` is ``2018-01-01`` and ``schedule_interval`` is ``@weekly``,
``{{ next_ds }}`` will be ``2018-01-08``
``{{ next_ds_nodash }}`` the next execution date as ``YYYYMMDD`` if exists, else ``None``
``{{ yesterday_ds }}`` the day before the execution date as ``YYYY-MM-DD``
``{{ yesterday_ds_nodash }}`` the day before the execution date as ``YYYYMMDD``
``{{ tomorrow_ds }}`` the day after the execution date as ``YYYY-MM-DD``
``{{ tomorrow_ds_nodash }}`` the day after the execution date as ``YYYYMMDD``
``{{ ts }}`` same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
``{{ ts_nodash }}`` same as ``ts`` without ``-``, ``:`` and TimeZone info. Example: ``20180101T000000``
``{{ ts_nodash_with_tz }}`` same as ``ts`` without ``-`` and ``:``. Example: ``20180101T000000+0000``
``{{ execution_date }}`` the execution_date (`pendulum.Pendulum`_)
``{{ next_execution_date }}`` the next execution date (`pendulum.Pendulum`_)
``{{ params }}`` a reference to the user-defined params dictionary which can be overridden by
the dictionary passed through ``trigger_dag -c`` if you enabled
``dag_run_conf_overrides_params` in ``airflow.cfg``
``{{ prev_ds }}`` the previous execution date as ``YYYY-MM-DD``
if ``{{ ds }}`` is ``2018-01-08`` and ``schedule_interval`` is ``@weekly``,
``{{ prev_ds }}`` will be ``2018-01-01``
``{{ prev_ds_nodash }}`` the previous execution date as ``YYYYMMDD`` if exists, else ``None``
``{{ prev_execution_date }}`` the previous execution date (if available) (`pendulum.Pendulum`_)
``{{ prev_execution_date_success }}`` execution date from prior succesful dag run (if available) (`pendulum.Pendulum`_)
``{{ prev_start_date_success }}`` start date from prior successful dag run (if available) (`pendulum.Pendulum`_)
``{{ next_execution_date }}`` the next execution date (`pendulum.Pendulum`_)
``{{ dag }}`` the DAG object
``{{ run_id }}`` the ``run_id`` of the current DAG run
``{{ task }}`` the Task object
``{{ macros }}`` a reference to the macros package, described below
``{{ task_instance }}`` the task_instance object
``{{ end_date }}`` same as ``{{ ds }}``
``{{ latest_date }}`` same as ``{{ ds }}``
``{{ ti }}`` same as ``{{ task_instance }}``
``{{ params }}`` a reference to the user-defined params dictionary which can be overridden by
the dictionary passed through ``trigger_dag -c`` if you enabled
``dag_run_conf_overrides_params` in ``airflow.cfg``
``{{ var.value.my_var }}`` global defined variables represented as a dictionary
``{{ var.json.my_var.path }}`` global defined variables represented as a dictionary
with deserialized JSON object, append the path to the
key within the JSON object
``{{ task_instance_key_str }}`` a unique, human-readable key to the task instance
formatted ``{dag_id}_{task_id}_{ds}``
``{{ conf }}`` the full configuration object located at
``airflow.configuration.conf`` which
represents the content of your
``airflow.cfg``
``{{ run_id }}`` the ``run_id`` of the current DAG run
``{{ dag_run }}`` a reference to the DagRun object
``{{ test_mode }}`` whether the task instance was called using
the CLI's test subcommand
``{{ ti }}`` same as ``{{ task_instance }}``
``{{ ts }}`` same as ``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
``{{ ts_nodash }}`` same as ``ts`` without ``-``, ``:`` and TimeZone info. Example: ``20180101T000000``
``{{ ts_nodash_with_tz }}`` same as ``ts`` without ``-`` and ``:``. Example: ``20180101T000000+0000``
``{{ var.json.my_var.path }}`` global defined variables represented as a dictionary
with deserialized JSON object, append the path to the
key within the JSON object
``{{ var.value.my_var }}`` global defined variables represented as a dictionary
===================================== ====================================

Note that you can access the object's attributes and methods with simple
Expand All @@ -90,6 +84,16 @@ UI. You can access them as either plain-text or JSON. If you use JSON, you are
also able to walk nested structures, such as dictionaries like:
``{{ var.json.my_dict_var.key1 }}``

From the ``execution_date`` you can derive other dates. For example, say you need
the date one day before the execution date, formatted as ``YYYY-MM-DD``:

.. code-block:: python

{{ (execution_date - macros.timedelta(1)).strftime('%Y-%m-%d') }}

The ``macros`` package is available in the context variables and contains a number
of useful other packages to process the context variables.

Macros
------
Macros are a way to expose objects to your templates and live under the
Expand Down
1 change: 0 additions & 1 deletion tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ def test_test(self):

output = out.getvalue()
# Check that prints, and log messages, are shown
self.assertIn('END_DATE', output)
self.assertIn("'example_python_operator__print_the_context__20180101'", output)
finally:
sys.stdout = saved_stdout
Expand Down
6 changes: 0 additions & 6 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,12 +689,6 @@ def test_task_get_template(self):
self.assertEqual(context['ts_nodash'], '20150101T000000')
self.assertEqual(context['ts_nodash_with_tz'], '20150101T000000+0000')

self.assertEqual(context['yesterday_ds'], '2014-12-31')
self.assertEqual(context['yesterday_ds_nodash'], '20141231')

self.assertEqual(context['tomorrow_ds'], '2015-01-02')
self.assertEqual(context['tomorrow_ds_nodash'], '20150102')

def test_import_examples(self):
self.assertEqual(len(self.dagbag.dags), NUM_EXAMPLE_DAGS)

Expand Down