Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1836,6 +1836,7 @@ def get_template_context(
session = settings.Session()

from airflow import macros
from airflow.models.abstractoperator import NotMapped

integrate_macros_plugins()

Expand Down Expand Up @@ -1964,6 +1965,11 @@ def get_triggering_events() -> dict[str, list[DatasetEvent]]:

return triggering_events

try:
expanded_ti_count: int | None = task.get_mapped_ti_count(self.run_id, session=session)
except NotMapped:
expanded_ti_count = None

# NOTE: If you add anything to this dict, make sure to also update the
# definition in airflow/utils/context.pyi, and KNOWN_CONTEXT_KEYS in
# airflow/utils/context.py!
Expand All @@ -1976,6 +1982,7 @@ def get_triggering_events() -> dict[str, list[DatasetEvent]]:
"ds": ds,
"ds_nodash": ds_nodash,
"execution_date": logical_date,
"expanded_ti_count": expanded_ti_count,
"inlets": task.inlets,
"logical_date": logical_date,
"macros": macros,
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
BASE_SERIALIZABLE_CONTEXT_KEYS = {
"ds",
"ds_nodash",
"expanded_ti_count",
"inlets",
"next_ds",
"next_ds_nodash",
Expand Down
5 changes: 3 additions & 2 deletions airflow/utils/context.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# undefined attribute errors from Mypy. Hopefully there will be a mechanism to
# declare "these are defined, but don't error if others are accessed" someday.

from typing import Any, Container, Iterable, Mapping, Union, overload
from typing import Any, Collection, Container, Iterable, Mapping, Union, overload

from pendulum import DateTime

Expand Down Expand Up @@ -61,8 +61,9 @@ class Context(TypedDict, total=False):
data_interval_start: DateTime
ds: str
ds_nodash: str
execution_date: DateTime
exception: Union[KeyboardInterrupt, Exception, str, None]
execution_date: DateTime
expanded_ti_count: int | None
inlets: list
logical_date: DateTime
macros: Any
Expand Down
85 changes: 42 additions & 43 deletions docs/apache-airflow/templates-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,49 +33,48 @@ Variables
The Airflow engine passes a few variables by default that are accessible
in all templates

========================================== ====================================
Variable Description
========================================== ====================================
``{{ data_interval_start }}`` Start of the data interval (`pendulum.DateTime`_).
``{{ data_interval_end }}`` End of the data interval (`pendulum.DateTime`_).
``{{ ds }}`` The DAG run's logical date as ``YYYY-MM-DD``.
Same as ``{{ dag_run.logical_date | ds }}``.
``{{ ds_nodash }}`` Same as ``{{ dag_run.logical_date | ds_nodash }}``.
``{{ ts }}`` Same as ``{{ dag_run.logical_date | ts }}``.
Example: ``2018-01-01T00:00:00+00:00``.
``{{ ts_nodash_with_tz }}`` Same as ``{{ dag_run.logical_date | ts_nodash_with_tz }}``.
Example: ``20180101T000000+0000``.
``{{ ts_nodash }}`` Same as ``{{ dag_run.logical_date | ts_nodash }}``.
Example: ``20180101T000000``.
``{{ prev_data_interval_start_success }}`` Start of the data interval from prior successful DAG run
(`pendulum.DateTime`_ or ``None``).
``{{ prev_data_interval_end_success }}`` End of the data interval from prior successful DAG run
(`pendulum.DateTime`_ or ``None``).
``{{ prev_start_date_success }}`` Start date from prior successful dag run (if available)
(`pendulum.DateTime`_ or ``None``).
``{{ dag }}`` The DAG object.
``{{ task }}`` The Task object.
``{{ macros }}`` A reference to the macros package, described below.
``{{ task_instance }}`` The task_instance object.
``{{ ti }}`` Same as ``{{ task_instance }}``.
``{{ params }}`` A reference to the user-defined params dictionary which can be
overridden by the dictionary passed through ``trigger_dag -c`` if
you enabled ``dag_run_conf_overrides_params`` in ``airflow.cfg``.
``{{ var.value.my_var }}`` Global defined variables represented as a dictionary.
``{{ var.json.my_var.path }}`` Global defined variables represented as a dictionary.
With deserialized JSON object, append the path to the key within
the JSON object.
``{{ conn.my_conn_id }}`` Connection represented as a dictionary.
``{{ task_instance_key_str }}`` A unique, human-readable key to the task instance formatted
``{dag_id}__{task_id}__{ds_nodash}``.
``{{ conf }}`` The full configuration object located at
``airflow.configuration.conf`` which represents the content of
your ``airflow.cfg``.
``{{ run_id }}`` The ``run_id`` of the current DAG run.
``{{ dag_run }}`` A reference to the DagRun object.
``{{ test_mode }}`` Whether the task instance was called using the CLI's test
subcommand.
========================================== ====================================
=========================================== ===================== ===================================================================
Variable Type Description
=========================================== ===================== ===================================================================
``{{ data_interval_start }}`` `pendulum.DateTime`_ Start of the data interval. Added in version 2.3.
``{{ data_interval_end }}`` `pendulum.DateTime`_ End of the data interval. Added in version 2.3.
``{{ ds }}`` str | The DAG run's logical date as ``YYYY-MM-DD``.
| Same as ``{{ dag_run.logical_date | ds }}``.
``{{ ds_nodash }}`` str Same as ``{{ dag_run.logical_date | ds_nodash }}``.
``{{ ts }}`` str | Same as ``{{ dag_run.logical_date | ts }}``.
| Example: ``2018-01-01T00:00:00+00:00``.
``{{ ts_nodash_with_tz }}`` str | Same as ``{{ dag_run.logical_date | ts_nodash_with_tz }}``.
| Example: ``20180101T000000+0000``.
``{{ ts_nodash }}`` str | Same as ``{{ dag_run.logical_date | ts_nodash }}``.
| Example: ``20180101T000000``.
``{{ prev_data_interval_start_success }}`` `pendulum.DateTime`_ | Start of the data interval of the prior successful DAG run.
| ``None`` | Added in version 2.3.
``{{ prev_data_interval_end_success }}`` `pendulum.DateTime`_ | End of the data interval of the prior successful DAG run.
| ``None`` | Added in version 2.3.
``{{ prev_start_date_success }}`` `pendulum.DateTime`_ Start date from prior successful dag run (if available).
| ``None``
``{{ dag }}`` DAG The currently running DAG.
``{{ task }}`` BaseOperator | The currently running task.
``{{ macros }}`` | A reference to the macros package. See Macros_ below.
``{{ task_instance }}`` TaskInstance The currently running task instance.
``{{ ti }}`` TaskInstance Same as ``{{ task_instance }}``.
``{{ params }}`` dict[str, Any] | The user-defined params. This can be overridden by the mapping
| passed to ``trigger_dag -c`` if ``dag_run_conf_overrides_params``
| is enabled in ``airflow.cfg``.
``{{ var.value }}`` Airflow variables. See `Airflow Variables in Templates`_ below.
``{{ var.json }}`` Airflow variables. See `Airflow Variables in Templates`_ below.
``{{ conn }}`` Airflow connections. See `Airflow Connections in Templates`_ below.
``{{ task_instance_key_str }}`` str | A unique, human-readable key to the task instance. The format is
| ``{dag_id}__{task_id}__{ds_nodash}``.
``{{ conf }}`` AirflowConfigParser | The full configuration object representing the content of your
| ``airflow.cfg``. See :mod:`airflow.configuration.conf`.
``{{ run_id }}`` str The currently running DAG run's run ID.
``{{ dag_run }}`` DagRun The currently running DAG run.
``{{ test_mode }}`` bool Whether the task instance was run by the ``airflow test`` CLI.
``{{ expanded_ti_count }}`` int | ``None`` | Number of task instances that a mapped task was expanded into. If
| the current task is not mapped, this should be ``None``.
| Added in version 2.5.
=========================================== ===================== ===================================================================

.. note::

Expand Down