From 4209964ed269b071fc718ce2384e51126a3d2744 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 15 Nov 2022 14:34:18 +0800 Subject: [PATCH 1/2] Add expanded_ti_count to ti context This value indicates how many tis the current mapped task is mapped into. This will be used to resolve values from parent a mapped task group without needing to call the retrival function repeatedly (which incurs database overhead). --- airflow/models/taskinstance.py | 7 +++++++ airflow/operators/python.py | 1 + airflow/utils/context.pyi | 5 +++-- docs/apache-airflow/templates-ref.rst | 2 ++ 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 197d7c6fc5fa6..4b4d9241e9af4 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1836,6 +1836,7 @@ def get_template_context( session = settings.Session() from airflow import macros + from airflow.models.abstractoperator import NotMapped integrate_macros_plugins() @@ -1964,6 +1965,11 @@ def get_triggering_events() -> dict[str, list[DatasetEvent]]: return triggering_events + try: + expanded_ti_count: int | None = task.get_mapped_ti_count(self.run_id, session=session) + except NotMapped: + expanded_ti_count = None + # NOTE: If you add anything to this dict, make sure to also update the # definition in airflow/utils/context.pyi, and KNOWN_CONTEXT_KEYS in # airflow/utils/context.py! @@ -1976,6 +1982,7 @@ def get_triggering_events() -> dict[str, list[DatasetEvent]]: "ds": ds, "ds_nodash": ds_nodash, "execution_date": logical_date, + "expanded_ti_count": expanded_ti_count, "inlets": task.inlets, "logical_date": logical_date, "macros": macros, diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 17f1dc1360bc0..0a0dd34fef355 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -274,6 +274,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): BASE_SERIALIZABLE_CONTEXT_KEYS = { "ds", "ds_nodash", + "expanded_ti_count", "inlets", "next_ds", "next_ds_nodash", diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi index 998360cef2c93..c7bab20c85442 100644 --- a/airflow/utils/context.pyi +++ b/airflow/utils/context.pyi @@ -25,7 +25,7 @@ # undefined attribute errors from Mypy. Hopefully there will be a mechanism to # declare "these are defined, but don't error if others are accessed" someday. -from typing import Any, Container, Iterable, Mapping, Union, overload +from typing import Any, Collection, Container, Iterable, Mapping, Union, overload from pendulum import DateTime @@ -61,8 +61,9 @@ class Context(TypedDict, total=False): data_interval_start: DateTime ds: str ds_nodash: str - execution_date: DateTime exception: Union[KeyboardInterrupt, Exception, str, None] + execution_date: DateTime + expanded_ti_count: int | None inlets: list logical_date: DateTime macros: Any diff --git a/docs/apache-airflow/templates-ref.rst b/docs/apache-airflow/templates-ref.rst index da9a526152a46..142b91d9f08da 100644 --- a/docs/apache-airflow/templates-ref.rst +++ b/docs/apache-airflow/templates-ref.rst @@ -75,6 +75,8 @@ Variable Description ``{{ dag_run }}`` A reference to the DagRun object. ``{{ test_mode }}`` Whether the task instance was called using the CLI's test subcommand. +``{{ expanded_ti_count }}`` Number of task instances that a mapped task was expanded into. If + the current task is not mapped, this should be ``None``. ========================================== ==================================== .. note:: From 670f464410859caba9c8a8d68edd500fa67f6ba2 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 16 Nov 2022 06:41:22 +0800 Subject: [PATCH 2/2] Remake template variable table Type information is separated into a separate column, and some additional information is added to the description. --- docs/apache-airflow/templates-ref.rst | 87 +++++++++++++-------------- 1 file changed, 42 insertions(+), 45 deletions(-) diff --git a/docs/apache-airflow/templates-ref.rst b/docs/apache-airflow/templates-ref.rst index 142b91d9f08da..c49077ef67f5f 100644 --- a/docs/apache-airflow/templates-ref.rst +++ b/docs/apache-airflow/templates-ref.rst @@ -33,51 +33,48 @@ Variables The Airflow engine passes a few variables by default that are accessible in all templates -========================================== ==================================== -Variable Description -========================================== ==================================== -``{{ data_interval_start }}`` Start of the data interval (`pendulum.DateTime`_). -``{{ data_interval_end }}`` End of the data interval (`pendulum.DateTime`_). -``{{ ds }}`` The DAG run's logical date as ``YYYY-MM-DD``. - Same as ``{{ dag_run.logical_date | ds }}``. -``{{ ds_nodash }}`` Same as ``{{ dag_run.logical_date | ds_nodash }}``. -``{{ ts }}`` Same as ``{{ dag_run.logical_date | ts }}``. - Example: ``2018-01-01T00:00:00+00:00``. -``{{ ts_nodash_with_tz }}`` Same as ``{{ dag_run.logical_date | ts_nodash_with_tz }}``. - Example: ``20180101T000000+0000``. -``{{ ts_nodash }}`` Same as ``{{ dag_run.logical_date | ts_nodash }}``. - Example: ``20180101T000000``. -``{{ prev_data_interval_start_success }}`` Start of the data interval from prior successful DAG run - (`pendulum.DateTime`_ or ``None``). -``{{ prev_data_interval_end_success }}`` End of the data interval from prior successful DAG run - (`pendulum.DateTime`_ or ``None``). -``{{ prev_start_date_success }}`` Start date from prior successful dag run (if available) - (`pendulum.DateTime`_ or ``None``). -``{{ dag }}`` The DAG object. -``{{ task }}`` The Task object. -``{{ macros }}`` A reference to the macros package, described below. -``{{ task_instance }}`` The task_instance object. -``{{ ti }}`` Same as ``{{ task_instance }}``. -``{{ params }}`` A reference to the user-defined params dictionary which can be - overridden by the dictionary passed through ``trigger_dag -c`` if - you enabled ``dag_run_conf_overrides_params`` in ``airflow.cfg``. -``{{ var.value.my_var }}`` Global defined variables represented as a dictionary. -``{{ var.json.my_var.path }}`` Global defined variables represented as a dictionary. - With deserialized JSON object, append the path to the key within - the JSON object. -``{{ conn.my_conn_id }}`` Connection represented as a dictionary. -``{{ task_instance_key_str }}`` A unique, human-readable key to the task instance formatted - ``{dag_id}__{task_id}__{ds_nodash}``. -``{{ conf }}`` The full configuration object located at - ``airflow.configuration.conf`` which represents the content of - your ``airflow.cfg``. -``{{ run_id }}`` The ``run_id`` of the current DAG run. -``{{ dag_run }}`` A reference to the DagRun object. -``{{ test_mode }}`` Whether the task instance was called using the CLI's test - subcommand. -``{{ expanded_ti_count }}`` Number of task instances that a mapped task was expanded into. If - the current task is not mapped, this should be ``None``. -========================================== ==================================== +=========================================== ===================== =================================================================== +Variable Type Description +=========================================== ===================== =================================================================== +``{{ data_interval_start }}`` `pendulum.DateTime`_ Start of the data interval. Added in version 2.3. +``{{ data_interval_end }}`` `pendulum.DateTime`_ End of the data interval. Added in version 2.3. +``{{ ds }}`` str | The DAG run's logical date as ``YYYY-MM-DD``. + | Same as ``{{ dag_run.logical_date | ds }}``. +``{{ ds_nodash }}`` str Same as ``{{ dag_run.logical_date | ds_nodash }}``. +``{{ ts }}`` str | Same as ``{{ dag_run.logical_date | ts }}``. + | Example: ``2018-01-01T00:00:00+00:00``. +``{{ ts_nodash_with_tz }}`` str | Same as ``{{ dag_run.logical_date | ts_nodash_with_tz }}``. + | Example: ``20180101T000000+0000``. +``{{ ts_nodash }}`` str | Same as ``{{ dag_run.logical_date | ts_nodash }}``. + | Example: ``20180101T000000``. +``{{ prev_data_interval_start_success }}`` `pendulum.DateTime`_ | Start of the data interval of the prior successful DAG run. + | ``None`` | Added in version 2.3. +``{{ prev_data_interval_end_success }}`` `pendulum.DateTime`_ | End of the data interval of the prior successful DAG run. + | ``None`` | Added in version 2.3. +``{{ prev_start_date_success }}`` `pendulum.DateTime`_ Start date from prior successful dag run (if available). + | ``None`` +``{{ dag }}`` DAG The currently running DAG. +``{{ task }}`` BaseOperator | The currently running task. +``{{ macros }}`` | A reference to the macros package. See Macros_ below. +``{{ task_instance }}`` TaskInstance The currently running task instance. +``{{ ti }}`` TaskInstance Same as ``{{ task_instance }}``. +``{{ params }}`` dict[str, Any] | The user-defined params. This can be overridden by the mapping + | passed to ``trigger_dag -c`` if ``dag_run_conf_overrides_params`` + | is enabled in ``airflow.cfg``. +``{{ var.value }}`` Airflow variables. See `Airflow Variables in Templates`_ below. +``{{ var.json }}`` Airflow variables. See `Airflow Variables in Templates`_ below. +``{{ conn }}`` Airflow connections. See `Airflow Connections in Templates`_ below. +``{{ task_instance_key_str }}`` str | A unique, human-readable key to the task instance. The format is + | ``{dag_id}__{task_id}__{ds_nodash}``. +``{{ conf }}`` AirflowConfigParser | The full configuration object representing the content of your + | ``airflow.cfg``. See :mod:`airflow.configuration.conf`. +``{{ run_id }}`` str The currently running DAG run's run ID. +``{{ dag_run }}`` DagRun The currently running DAG run. +``{{ test_mode }}`` bool Whether the task instance was run by the ``airflow test`` CLI. +``{{ expanded_ti_count }}`` int | ``None`` | Number of task instances that a mapped task was expanded into. If + | the current task is not mapped, this should be ``None``. + | Added in version 2.5. +=========================================== ===================== =================================================================== .. note::