` for more information about the data collected and how to opt-out.
-
Troubleshooting
'''''''''''''''
diff --git a/docs/apache-airflow/installation/supported-versions.rst b/docs/apache-airflow/installation/supported-versions.rst
index c8fc9c8293ee1..dacd2da7bde09 100644
--- a/docs/apache-airflow/installation/supported-versions.rst
+++ b/docs/apache-airflow/installation/supported-versions.rst
@@ -29,7 +29,7 @@ Apache Airflow® version life cycle:
========= ===================== ========= =============== ================= ================
Version Current Patch/Minor State First Release Limited Support EOL/Terminated
========= ===================== ========= =============== ================= ================
-2 2.10.4 Supported Dec 17, 2020 TBD TBD
+2 2.10.5 Supported Dec 17, 2020 TBD TBD
1.10 1.10.15 EOL Aug 27, 2018 Dec 17, 2020 June 17, 2021
1.9 1.9.0 EOL Jan 03, 2018 Aug 27, 2018 Aug 27, 2018
1.8 1.8.2 EOL Mar 19, 2017 Jan 03, 2018 Jan 03, 2018
diff --git a/docs/docker-stack/README.md b/docs/docker-stack/README.md
index 9369660e2ccc2..5db0badc469ed 100644
--- a/docs/docker-stack/README.md
+++ b/docs/docker-stack/README.md
@@ -31,12 +31,12 @@ Every time a new version of Airflow is released, the images are prepared in the
[apache/airflow DockerHub](https://hub.docker.com/r/apache/airflow)
for all the supported Python versions.
-You can find the following images there (Assuming Airflow version `2.10.4`):
+You can find the following images there (Assuming Airflow version `2.10.5`):
* `apache/airflow:latest` - the latest released Airflow image with default Python version (3.8 currently)
* `apache/airflow:latest-pythonX.Y` - the latest released Airflow image with specific Python version
-* `apache/airflow:2.10.4` - the versioned Airflow image with default Python version (3.8 currently)
-* `apache/airflow:2.10.4-pythonX.Y` - the versioned Airflow image with specific Python version
+* `apache/airflow:2.10.5` - the versioned Airflow image with default Python version (3.8 currently)
+* `apache/airflow:2.10.5-pythonX.Y` - the versioned Airflow image with specific Python version
Those are "reference" regular images. They contain the most common set of extras, dependencies and providers that are
often used by the users and they are good to "try-things-out" when you want to just take Airflow for a spin,
@@ -47,8 +47,8 @@ via [Building the image](https://airflow.apache.org/docs/docker-stack/build.html
* `apache/airflow:slim-latest` - the latest released Airflow image with default Python version (3.8 currently)
* `apache/airflow:slim-latest-pythonX.Y` - the latest released Airflow image with specific Python version
-* `apache/airflow:slim-2.10.4` - the versioned Airflow image with default Python version (3.8 currently)
-* `apache/airflow:slim-2.10.4-pythonX.Y` - the versioned Airflow image with specific Python version
+* `apache/airflow:slim-2.10.5` - the versioned Airflow image with default Python version (3.8 currently)
+* `apache/airflow:slim-2.10.5-pythonX.Y` - the versioned Airflow image with specific Python version
The Apache Airflow image provided as convenience package is optimized for size, and
it provides just a bare minimal set of the extras and dependencies installed and in most cases
diff --git a/docs/docker-stack/docker-examples/extending/add-airflow-configuration/Dockerfile b/docs/docker-stack/docker-examples/extending/add-airflow-configuration/Dockerfile
index 5fb16b7ced047..ebb3c9e13fe64 100644
--- a/docs/docker-stack/docker-examples/extending/add-airflow-configuration/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-airflow-configuration/Dockerfile
@@ -15,7 +15,7 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
ENV AIRFLOW__CORE__LOAD_EXAMPLES=True
ENV AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=my_conn_string
# [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile b/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile
index 72346ed959730..221b3dcdf6dd8 100644
--- a/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-apt-packages/Dockerfile
@@ -15,7 +15,7 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
diff --git a/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile b/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile
index 2bb166deb0ba0..399569c296198 100644
--- a/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-build-essential-extend/Dockerfile
@@ -15,7 +15,7 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
diff --git a/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile b/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
index 46584a972c3ef..50bc0e745e7f3 100644
--- a/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
@@ -15,7 +15,7 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
diff --git a/docs/docker-stack/docker-examples/extending/add-pypi-packages-constraints/Dockerfile b/docs/docker-stack/docker-examples/extending/add-pypi-packages-constraints/Dockerfile
index d0a73412945d0..845b1107b9440 100644
--- a/docs/docker-stack/docker-examples/extending/add-pypi-packages-constraints/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-pypi-packages-constraints/Dockerfile
@@ -15,6 +15,6 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" lxml --constraint "${HOME}/constraints.txt"
# [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/add-pypi-packages-uv/Dockerfile b/docs/docker-stack/docker-examples/extending/add-pypi-packages-uv/Dockerfile
index 06082308dc483..c6f87ff024bce 100644
--- a/docs/docker-stack/docker-examples/extending/add-pypi-packages-uv/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-pypi-packages-uv/Dockerfile
@@ -15,7 +15,7 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
# The `uv` tools is Rust packaging tool that is much faster than `pip` and other installer
# Support for uv as installation tool is experimental
diff --git a/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile b/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile
index fe19bacf174cc..d6ca1b31faf51 100644
--- a/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-pypi-packages/Dockerfile
@@ -15,6 +15,6 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" lxml
# [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/add-requirement-packages/Dockerfile b/docs/docker-stack/docker-examples/extending/add-requirement-packages/Dockerfile
index ecbbf1984bbce..aebc7f49b23ba 100644
--- a/docs/docker-stack/docker-examples/extending/add-requirement-packages/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/add-requirement-packages/Dockerfile
@@ -15,7 +15,7 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" -r /requirements.txt
# [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/custom-providers/Dockerfile b/docs/docker-stack/docker-examples/extending/custom-providers/Dockerfile
index 363debd15c43e..4112a4b5a89e4 100644
--- a/docs/docker-stack/docker-examples/extending/custom-providers/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/custom-providers/Dockerfile
@@ -15,6 +15,6 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
RUN pip install "apache-airflow==${AIRFLOW_VERSION}" --no-cache-dir apache-airflow-providers-docker==2.5.1
# [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile b/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile
index 59f395d1728df..1a4672f84f968 100644
--- a/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/embedding-dags/Dockerfile
@@ -15,7 +15,7 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
COPY --chown=airflow:root test_dag.py /opt/airflow/dags
diff --git a/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile b/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile
index 7e3cee6464585..a72f2bed5bd9b 100644
--- a/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile
+++ b/docs/docker-stack/docker-examples/extending/writable-directory/Dockerfile
@@ -15,7 +15,7 @@
# This is an example Dockerfile. It is not intended for PRODUCTION use
# [START Dockerfile]
-FROM apache/airflow:2.10.4
+FROM apache/airflow:2.10.5
RUN umask 0002; \
mkdir -p ~/writeable-directory
# [END Dockerfile]
diff --git a/docs/docker-stack/entrypoint.rst b/docs/docker-stack/entrypoint.rst
index 5c0d0d0a432a6..0da779d8bec85 100644
--- a/docs/docker-stack/entrypoint.rst
+++ b/docs/docker-stack/entrypoint.rst
@@ -132,7 +132,7 @@ if you specify extra arguments. For example:
.. code-block:: bash
- docker run -it apache/airflow:2.10.4-python3.8 bash -c "ls -la"
+ docker run -it apache/airflow:2.10.5-python3.8 bash -c "ls -la"
total 16
drwxr-xr-x 4 airflow root 4096 Jun 5 18:12 .
drwxr-xr-x 1 root root 4096 Jun 5 18:12 ..
@@ -144,7 +144,7 @@ you pass extra parameters. For example:
.. code-block:: bash
- > docker run -it apache/airflow:2.10.4-python3.8 python -c "print('test')"
+ > docker run -it apache/airflow:2.10.5-python3.8 python -c "print('test')"
test
If first argument equals to "airflow" - the rest of the arguments is treated as an airflow command
@@ -152,13 +152,13 @@ to execute. Example:
.. code-block:: bash
- docker run -it apache/airflow:2.10.4-python3.8 airflow webserver
+ docker run -it apache/airflow:2.10.5-python3.8 airflow webserver
If there are any other arguments - they are simply passed to the "airflow" command
.. code-block:: bash
- > docker run -it apache/airflow:2.10.4-python3.8 help
+ > docker run -it apache/airflow:2.10.5-python3.8 help
usage: airflow [-h] GROUP_OR_COMMAND ...
positional arguments:
@@ -363,7 +363,7 @@ database and creating an ``admin/admin`` Admin user with the following command:
--env "_AIRFLOW_DB_MIGRATE=true" \
--env "_AIRFLOW_WWW_USER_CREATE=true" \
--env "_AIRFLOW_WWW_USER_PASSWORD=admin" \
- apache/airflow:2.10.4-python3.8 webserver
+ apache/airflow:2.10.5-python3.8 webserver
.. code-block:: bash
@@ -372,7 +372,7 @@ database and creating an ``admin/admin`` Admin user with the following command:
--env "_AIRFLOW_DB_MIGRATE=true" \
--env "_AIRFLOW_WWW_USER_CREATE=true" \
--env "_AIRFLOW_WWW_USER_PASSWORD_CMD=echo admin" \
- apache/airflow:2.10.4-python3.8 webserver
+ apache/airflow:2.10.5-python3.8 webserver
The commands above perform initialization of the SQLite database, create admin user with admin password
and Admin role. They also forward local port ``8080`` to the webserver port and finally start the webserver.
@@ -412,6 +412,6 @@ Example:
--env "_AIRFLOW_DB_MIGRATE=true" \
--env "_AIRFLOW_WWW_USER_CREATE=true" \
--env "_AIRFLOW_WWW_USER_PASSWORD_CMD=echo admin" \
- apache/airflow:2.10.4-python3.8 webserver
+ apache/airflow:2.10.5-python3.8 webserver
This method is only available starting from Docker image of Airflow 2.1.1 and above.
diff --git a/docs/exts/airflow_intersphinx.py b/docs/exts/airflow_intersphinx.py
index b0fecdec9b7b2..ccfd6662be3b2 100644
--- a/docs/exts/airflow_intersphinx.py
+++ b/docs/exts/airflow_intersphinx.py
@@ -126,7 +126,7 @@ def fetch_inventories(intersphinx_mapping) -> dict[str, Any]:
cache: dict[Any, Any] = {}
with concurrent.futures.ThreadPoolExecutor() as pool:
for name, (uri, invs) in intersphinx_mapping.values():
- pool.submit(fetch_inventory_group, name, uri, invs, cache, _MockApp(), now)
+ pool.submit(fetch_inventory_group, name, uri, invs, cache, _MockApp(), now) # type: ignore[arg-type]
inv_dict = {}
for uri, (name, now, invdata) in cache.items():
diff --git a/docs/sphinx_design/static/custom.css b/docs/sphinx_design/static/custom.css
index b1cf49f37d486..70356c06a97ca 100644
--- a/docs/sphinx_design/static/custom.css
+++ b/docs/sphinx_design/static/custom.css
@@ -31,3 +31,38 @@
--sd-color-tabs-underline-hover: #68d1ff;
--sd-color-tabs-underline: transparent;
}
+
+div.admonition.warning {
+ background: #e8cccc;
+ font-weight: bolder;
+}
+
+.rst-content .warning .admonition-title {
+ background: #cc341d;
+}
+
+/* Patches as of moving to Sphinx 7 to get layout to previous state */
+/* Needs to be cleaned in a follow-up to source this from the origin style in */
+/* https://github.com/apache/airflow-site/blob/main/landing-pages/site/assets/scss/_rst-content.scss */
+.base-layout {
+ padding-top: 123px !important;
+}
+
+section {
+ padding-top: 0rem !important;
+ padding-bottom: 0rem !important;
+}
+
+section ol li p:last-child, section ul li p:last-child {
+ margin-bottom: 0 !important;
+}
+
+a.headerlink {
+ content: "" !important;
+ font-size: 75% !important;
+}
+
+a.headerlink::after {
+ content: " [link]" !important; /* Theme image not existing */
+ visibility: visible !important;
+}
diff --git a/hatch_build.py b/hatch_build.py
index 8cb883f002690..13fa2a44330f3 100644
--- a/hatch_build.py
+++ b/hatch_build.py
@@ -156,37 +156,28 @@
DOC_EXTRAS: dict[str, list[str]] = {
"doc": [
- "astroid>=2.12.3,<3.0",
- "checksumdir>=1.2.0",
- # click 8.1.4 and 8.1.5 generate mypy errors due to typing issue in the upstream package:
- # https://github.com/pallets/click/issues/2558
- "click>=8.0,!=8.1.4,!=8.1.5",
- # Docutils 0.17.0 converts generated into and breaks our doc formatting
- # By adding a lot of whitespace separation. This limit can be lifted when we update our doc to handle
- # tags for sections
- "docutils<0.17,>=0.16",
- # The new theme 0.1.0 is for Sphinx 7. Airflow 2. still uses old version of Sphinx
- "sphinx-airflow-theme>=0.0.12,<0.1.0",
- "sphinx-argparse>=0.4.0",
- # sphinx-autoapi fails with astroid 3.0, see: https://github.com/readthedocs/sphinx-autoapi/issues/407
- # This was fixed in sphinx-autoapi 3.0, however it has requirement sphinx>=6.1, but we stuck on 5.x
- "sphinx-autoapi>=2.1.1",
- "sphinx-copybutton>=0.5.2",
- "sphinx-design>=0.5.0",
- "sphinx-jinja>=2.0.2",
- "sphinx-rtd-theme>=2.0.0",
- # Currently we are using sphinx 5 but we need to migrate to Sphinx 7
- "sphinx>=5.3.0,<6.0.0",
- "sphinxcontrib-applehelp>=1.0.4",
- "sphinxcontrib-devhelp>=1.0.2",
- "sphinxcontrib-htmlhelp>=2.0.1",
- "sphinxcontrib-httpdomain>=1.8.1",
- "sphinxcontrib-jquery>=4.1",
- "sphinxcontrib-jsmath>=1.0.1",
- "sphinxcontrib-qthelp>=1.0.3",
- "sphinxcontrib-redoc>=1.6.0",
- "sphinxcontrib-serializinghtml==1.1.5",
- "sphinxcontrib-spelling>=8.0.0",
+ "astroid>=3; python_version >= '3.9'",
+ "checksumdir>=1.2.0; python_version >= '3.9'",
+ "click>=8.1.8; python_version >= '3.9'",
+ "docutils>=0.21; python_version >= '3.9'",
+ "sphinx-airflow-theme>=0.1.0; python_version >= '3.9'",
+ "sphinx-argparse>=0.4.0; python_version >= '3.9'",
+ "sphinx-autoapi>=3; python_version >= '3.9'",
+ "sphinx-copybutton>=0.5.2; python_version >= '3.9'",
+ "sphinx-design>=0.5.0; python_version >= '3.9'",
+ "sphinx-jinja>=2.0.2; python_version >= '3.9'",
+ "sphinx-rtd-theme>=2.0.0; python_version >= '3.9'",
+ "sphinx>=7; python_version >= '3.9'",
+ "sphinxcontrib-applehelp>=1.0.4; python_version >= '3.9'",
+ "sphinxcontrib-devhelp>=1.0.2; python_version >= '3.9'",
+ "sphinxcontrib-htmlhelp>=2.0.1; python_version >= '3.9'",
+ "sphinxcontrib-httpdomain>=1.8.1; python_version >= '3.9'",
+ "sphinxcontrib-jquery>=4.1; python_version >= '3.9'",
+ "sphinxcontrib-jsmath>=1.0.1; python_version >= '3.9'",
+ "sphinxcontrib-qthelp>=1.0.3; python_version >= '3.9'",
+ "sphinxcontrib-redoc>=1.6.0; python_version >= '3.9'",
+ "sphinxcontrib-serializinghtml>=1.1.5; python_version >= '3.9'",
+ "sphinxcontrib-spelling>=8.0.0; python_version >= '3.9'",
],
"doc-gen": [
"apache-airflow[doc]",
diff --git a/newsfragments/44751.bugfix.rst b/newsfragments/44751.bugfix.rst
deleted file mode 100644
index c85601d0fe13a..0000000000000
--- a/newsfragments/44751.bugfix.rst
+++ /dev/null
@@ -1 +0,0 @@
-``TriggerRule.ALWAYS`` cannot be utilized within a task-generated mapping, either in bare tasks (fixed in this PR) or mapped task groups (fixed in PR #44368). The issue with doing so, is that the task is immediately executed without waiting for the upstreams's mapping results, which certainly leads to failure of the task. This fix avoids it by raising an exception when it is detected during DAG parsing.
diff --git a/newsfragments/44912.bugfix.rst b/newsfragments/44912.bugfix.rst
deleted file mode 100644
index 6d19c5223f564..0000000000000
--- a/newsfragments/44912.bugfix.rst
+++ /dev/null
@@ -1 +0,0 @@
-Fix short circuit operator in mapped tasks. The operator did not work until now due to a bug in ``NotPreviouslySkippedDep``. Please note that at time of merging, this fix has been applied only for Airflow version > 2.10.4 and < 3, and should be ported to v3 after merging PR #44925.
diff --git a/newsfragments/44937.bugfix.rst b/newsfragments/44937.bugfix.rst
deleted file mode 100644
index d50da4de82fc9..0000000000000
--- a/newsfragments/44937.bugfix.rst
+++ /dev/null
@@ -1 +0,0 @@
-Fix pre-mature evaluation of tasks in mapped task group. The origins of the bug are in ``TriggerRuleDep``, when dealing with ``TriggerRule`` that is fastly triggered (i.e, ``ONE_FAILED``, ``ONE_SUCCESS`, or ``ONE_DONE``). Please note that at time of merging, this fix has been applied only for Airflow version > 2.10.4 and < 3, and should be ported to v3 after merging PR #40460.
diff --git a/newsfragments/44938.bugfix.rst b/newsfragments/44938.bugfix.rst
deleted file mode 100644
index 4e6746b223d1d..0000000000000
--- a/newsfragments/44938.bugfix.rst
+++ /dev/null
@@ -1 +0,0 @@
-Fix task_id validation in BaseOperator (#44938)
diff --git a/newsfragments/44968.misc.rst b/newsfragments/44968.misc.rst
deleted file mode 100644
index 160ccd60855af..0000000000000
--- a/newsfragments/44968.misc.rst
+++ /dev/null
@@ -1 +0,0 @@
-The ``conf`` variable, which provided access to the full Airflow configuration (``airflow.cfg``), has been deprecated and will be removed in Airflow 3 from the Task (Jinja2) template context for security and simplicity. If you need specific configuration values in your tasks, retrieve them explicitly in your DAG or task code using the ``airflow.configuration.conf`` module. For users retrieving the webserver URL (e.g., to include log links in task or callbacks), one of the most common use-case, use the ``ti.log_url`` property available in the ``TaskInstance`` context instead.
diff --git a/newsfragments/45134.bugfix.rst b/newsfragments/45134.bugfix.rst
deleted file mode 100644
index 09aaae23a3487..0000000000000
--- a/newsfragments/45134.bugfix.rst
+++ /dev/null
@@ -1 +0,0 @@
-(v2 API & UI) Allow fetching XCom with forward slash from the API and escape it in the UI
diff --git a/newsfragments/45530.significant.rst b/newsfragments/45530.significant.rst
deleted file mode 100644
index 7e2ae8e8ac6a5..0000000000000
--- a/newsfragments/45530.significant.rst
+++ /dev/null
@@ -1,12 +0,0 @@
-Ensure teardown tasks are executed when DAG run is set to failed
-
-Previously when a DAG run was manually set to "failed" or to "success" state the terminal state was set to all tasks.
-But this was a gap for cases when setup- and teardown tasks were defined: If teardown was used to clean-up infrastructure
-or other resources, they were also skipped and thus resources could stay allocated.
-
-As of now when setup tasks had been executed before and the DAG is manually set to "failed" or "success" then teardown
-tasks are executed. Teardown tasks are skipped if the setup was also skipped.
-
-As a side effect this means if the DAG contains teardown tasks, then the manual marking of DAG as "failed" or "success"
-will need to keep the DAG in running state to ensure that teardown tasks will be scheduled. They would not be scheduled
-if the DAG is diorectly set to "failed" or "success".
diff --git a/scripts/ci/install_breeze.sh b/scripts/ci/install_breeze.sh
index 093c8f6db9ce5..5259628a579d3 100755
--- a/scripts/ci/install_breeze.sh
+++ b/scripts/ci/install_breeze.sh
@@ -21,8 +21,8 @@ cd "$( dirname "${BASH_SOURCE[0]}" )/../../"
PYTHON_ARG=""
-PIP_VERSION="24.3.1"
-UV_VERSION="0.5.17"
+PIP_VERSION="25.0"
+UV_VERSION="0.5.24"
if [[ ${PYTHON_VERSION=} != "" ]]; then
PYTHON_ARG="--python=$(which python"${PYTHON_VERSION}") "
fi
diff --git a/scripts/ci/pre_commit/common_precommit_utils.py b/scripts/ci/pre_commit/common_precommit_utils.py
index f406c8116622d..1fe162421fdf0 100644
--- a/scripts/ci/pre_commit/common_precommit_utils.py
+++ b/scripts/ci/pre_commit/common_precommit_utils.py
@@ -30,7 +30,7 @@
AIRFLOW_SOURCES_ROOT_PATH = Path(__file__).parents[3].resolve()
AIRFLOW_BREEZE_SOURCES_PATH = AIRFLOW_SOURCES_ROOT_PATH / "dev" / "breeze"
-DEFAULT_PYTHON_MAJOR_MINOR_VERSION = "3.8"
+DEFAULT_PYTHON_MAJOR_MINOR_VERSION = "3.9"
console = Console(width=400, color_system="standard")
@@ -223,12 +223,12 @@ def validate_cmd_result(cmd_result, include_ci_env_check=False):
"\n[yellow]If you see strange stacktraces above, especially about missing imports "
"run this command:[/]\n"
)
- console.print("[magenta]breeze ci-image build --python 3.8 --upgrade-to-newer-dependencies[/]\n")
+ console.print("[magenta]breeze ci-image build --python 3.9 --upgrade-to-newer-dependencies[/]\n")
elif cmd_result.returncode != 0:
console.print(
"[warning]\nIf you see strange stacktraces above, "
- "run `breeze ci-image build --python 3.8` and try again."
+ "run `breeze ci-image build --python 3.9` and try again."
)
sys.exit(cmd_result.returncode)
diff --git a/scripts/ci/pre_commit/supported_versions.py b/scripts/ci/pre_commit/supported_versions.py
index 8524f237dc993..a3a96abb957c0 100755
--- a/scripts/ci/pre_commit/supported_versions.py
+++ b/scripts/ci/pre_commit/supported_versions.py
@@ -27,7 +27,7 @@
HEADERS = ("Version", "Current Patch/Minor", "State", "First Release", "Limited Support", "EOL/Terminated")
SUPPORTED_VERSIONS = (
- ("2", "2.10.4", "Supported", "Dec 17, 2020", "TBD", "TBD"),
+ ("2", "2.10.5", "Supported", "Dec 17, 2020", "TBD", "TBD"),
("1.10", "1.10.15", "EOL", "Aug 27, 2018", "Dec 17, 2020", "June 17, 2021"),
("1.9", "1.9.0", "EOL", "Jan 03, 2018", "Aug 27, 2018", "Aug 27, 2018"),
("1.8", "1.8.2", "EOL", "Mar 19, 2017", "Jan 03, 2018", "Jan 03, 2018"),
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index dc77648784ce5..7b63aca840f01 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import json
import urllib
from datetime import timedelta
from unittest import mock
@@ -25,6 +26,7 @@
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.datasets import Dataset
+from airflow.models import Log
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun
from airflow.models.dataset import DatasetEvent, DatasetModel
@@ -1729,6 +1731,60 @@ def test_should_respond_200(self, state, run_type, dag_maker, session):
"note": None,
}
+ @pytest.mark.parametrize("state", ["failed", "success", "queued"])
+ @pytest.mark.parametrize("run_type", [state.value for state in DagRunType])
+ def test_action_logging(self, state, run_type, dag_maker, session):
+ dag_id = "TEST_DAG_ID"
+ dag_run_id = "TEST_DAG_RUN_ID"
+ with dag_maker(dag_id) as dag:
+ task = EmptyOperator(task_id="task_id", dag=dag)
+ self.app.dag_bag.bag_dag(dag, root_dag=dag)
+ dr = dag_maker.create_dagrun(run_id=dag_run_id, run_type=run_type)
+ ti = dr.get_task_instance(task_id="task_id")
+ ti.task = task
+ ti.state = State.RUNNING
+ session.merge(ti)
+ session.commit()
+
+ request_json = {"state": state}
+
+ self.client.patch(
+ f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
+ json=request_json,
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+
+ log = (
+ session.query(Log)
+ .filter(
+ Log.dag_id == dag_id,
+ Log.run_id == dag_run_id,
+ Log.event == "api.update_dag_run_state",
+ )
+ .order_by(Log.id.desc())
+ .first()
+ )
+ assert log.extra == json.dumps(request_json)
+
+ self.client.patch(
+ f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
+ json=request_json,
+ environ_overrides={"REMOTE_USER": "test"},
+ headers={"content-type": "application/json; charset=utf-8"},
+ )
+
+ log = (
+ session.query(Log)
+ .filter(
+ Log.dag_id == dag_id,
+ Log.run_id == dag_run_id,
+ Log.event == "api.update_dag_run_state",
+ )
+ .order_by(Log.id.desc())
+ .first()
+ )
+ assert log.extra == json.dumps(request_json)
+
def test_schema_validation_error_raises(self, dag_maker, session):
dag_id = "TEST_DAG_ID"
dag_run_id = "TEST_DAG_RUN_ID"
diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
index 76b2a09603609..cafdc8979ecd2 100644
--- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
+++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
@@ -27,6 +27,7 @@
from airflow.models.xcom import XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
+from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.timetables.base import DataInterval
from airflow.utils import timezone
from airflow.utils.state import DagRunState
@@ -62,7 +63,7 @@ def configured_app(minimal_app_for_api):
delete_user(app, username="test_no_permissions") # type: ignore
-class TestGetExtraLinks:
+class BaseGetExtraLinks:
@pytest.fixture(autouse=True)
def setup_attrs(self, configured_app, session) -> None:
self.default_time = timezone.datetime(2020, 1, 1)
@@ -72,7 +73,7 @@ def setup_attrs(self, configured_app, session) -> None:
self.app = configured_app
- self.dag = self._create_dag()
+ self.dag = self._create_dag() # type: ignore
self.app.dag_bag = DagBag(os.devnull, include_examples=False)
self.app.dag_bag.dags = {self.dag.dag_id: self.dag} # type: ignore
@@ -94,6 +95,8 @@ def teardown_method(self) -> None:
clear_db_runs()
clear_db_xcom()
+
+class TestGetExtraLinks(BaseGetExtraLinks):
def _create_dag(self):
with DAG(dag_id="TEST_DAG_ID", schedule=None, default_args={"start_date": self.default_time}) as dag:
CustomOperator(task_id="TEST_SINGLE_LINK", bash_command="TEST_LINK_VALUE")
@@ -241,3 +244,60 @@ class AirflowTestPlugin(AirflowPlugin):
"TEST_DAG_ID/TEST_SINGLE_LINK/2020-01-01T00%3A00%3A00%2B00%3A00"
),
} == response.json
+
+
+class TestMappedTaskExtraLinks(BaseGetExtraLinks):
+ def _create_dag(self):
+ with DAG(dag_id="TEST_DAG_ID", schedule=None, default_args={"start_date": self.default_time}) as dag:
+ # Mapped task expanded over a list of bash_commands
+ CustomOperator.partial(task_id="TEST_MAPPED_TASK").expand(
+ bash_command=["TEST_LINK_VALUE_3", "TEST_LINK_VALUE_4"]
+ )
+ return SerializedBaseOperator.deserialize(SerializedBaseOperator.serialize(dag))
+
+ @pytest.mark.parametrize(
+ "map_index, expected_status, expected_json",
+ [
+ (
+ 0,
+ 200,
+ {
+ "Google Custom": "http://google.com/custom_base_link?search=TEST_LINK_VALUE_3",
+ "google": "https://www.google.com",
+ },
+ ),
+ (
+ 1,
+ 200,
+ {
+ "Google Custom": "http://google.com/custom_base_link?search=TEST_LINK_VALUE_4",
+ "google": "https://www.google.com",
+ },
+ ),
+ (6, 404, {"detail": 'DAG Run with ID = "TEST_DAG_RUN_ID" not found'}),
+ ],
+ )
+ @mock_plugin_manager(plugins=[])
+ def test_mapped_task_links(self, map_index, expected_status, expected_json):
+ """Parameterized test for mapped task extra links."""
+ # Set XCom data for different map indices
+ if map_index < 2:
+ XCom.set(
+ key="search_query",
+ value=f"TEST_LINK_VALUE_{map_index + 3}",
+ task_id="TEST_MAPPED_TASK",
+ dag_id="TEST_DAG_ID",
+ run_id="TEST_DAG_RUN_ID",
+ map_index=map_index,
+ )
+
+ response = self.client.get(
+ f"/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_MAPPED_TASK/links?map_index={map_index}",
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+
+ assert response.status_code == expected_status
+ if map_index < 2:
+ assert response.json == expected_json
+ else:
+ assert response.json["detail"] == expected_json["detail"]
diff --git a/tests/conftest.py b/tests/conftest.py
index 6d064fa0a9bd2..6c17c6f4036d4 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1487,3 +1487,15 @@ def clean_dags_and_dagruns():
yield # Test runs here
clear_db_dags()
clear_db_runs()
+
+
+@pytest.fixture
+def clean_executor_loader():
+ """Clean the executor_loader state, as it stores global variables in the module, causing side effects for some tests."""
+ from airflow.executors.executor_loader import ExecutorLoader
+ from tests.test_utils.executor_loader import clean_executor_loader_module
+
+ clean_executor_loader_module()
+ yield # Test runs here
+ clean_executor_loader_module()
+ ExecutorLoader.init_executors()
diff --git a/tests/core/test_settings.py b/tests/core/test_settings.py
index 483ef24e25f7f..3a0c33b08b6ab 100644
--- a/tests/core/test_settings.py
+++ b/tests/core/test_settings.py
@@ -31,7 +31,7 @@
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.configuration import conf
from airflow.exceptions import AirflowClusterPolicyViolation, AirflowConfigException
-from airflow.settings import _ENABLE_AIP_44, TracebackSession, is_usage_data_collection_enabled
+from airflow.settings import _ENABLE_AIP_44, TracebackSession
from airflow.utils.session import create_session
from tests.test_utils.config import conf_vars
@@ -368,26 +368,3 @@ def test_create_session_ctx_mgr_no_call_methods(mock_new, clear_internal_api):
assert session == m
method_calls = [x[0] for x in m.method_calls]
assert method_calls == [] # commit and close not called when using internal API
-
-
-@pytest.mark.parametrize(
- "env_var, conf_setting, is_enabled",
- [
- ("false", "True", False), # env forces disable
- ("false", "False", False), # Both force disable
- ("False ", "False", False), # Both force disable
- ("true", "True", True), # Both enable
- ("true", "False", False), # Conf forces disable
- (None, "True", True), # Default env, conf enables
- (None, "False", False), # Default env, conf disables
- ],
-)
-def test_usage_data_collection_disabled(env_var, conf_setting, is_enabled, clear_internal_api):
- conf_patch = conf_vars({("usage_data_collection", "enabled"): conf_setting})
-
- if env_var is not None:
- with conf_patch, patch.dict(os.environ, {"SCARF_ANALYTICS": env_var}):
- assert is_usage_data_collection_enabled() == is_enabled
- else:
- with conf_patch:
- assert is_usage_data_collection_enabled() == is_enabled
diff --git a/tests/executors/test_executor_loader.py b/tests/executors/test_executor_loader.py
index 2192487a01cf8..dc60b9cc507ae 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -17,7 +17,6 @@
from __future__ import annotations
from contextlib import nullcontext
-from importlib import reload
from unittest import mock
import pytest
@@ -25,7 +24,7 @@
from airflow import plugins_manager
from airflow.exceptions import AirflowConfigException
from airflow.executors import executor_loader
-from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, ExecutorName
+from airflow.executors.executor_loader import ConnectorSource, ExecutorName
from airflow.executors.local_executor import LocalExecutor
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import AwsEcsExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
@@ -50,24 +49,12 @@ class FakePlugin(plugins_manager.AirflowPlugin):
executors = [FakeExecutor]
+@pytest.mark.usefixtures("clean_executor_loader")
class TestExecutorLoader:
- def setup_method(self) -> None:
- from airflow.executors import executor_loader
-
- reload(executor_loader)
- global ExecutorLoader
- ExecutorLoader = executor_loader.ExecutorLoader # type: ignore
-
- def teardown_method(self) -> None:
- from airflow.executors import executor_loader
-
- reload(executor_loader)
- ExecutorLoader.init_executors()
-
def test_no_executor_configured(self):
with conf_vars({("core", "executor"): None}):
with pytest.raises(AirflowConfigException, match=r".*not found in config$"):
- ExecutorLoader.get_default_executor()
+ executor_loader.ExecutorLoader.get_default_executor()
@pytest.mark.parametrize(
"executor_name",
@@ -81,18 +68,20 @@ def test_no_executor_configured(self):
)
def test_should_support_executor_from_core(self, executor_name):
with conf_vars({("core", "executor"): executor_name}):
- executor = ExecutorLoader.get_default_executor()
+ executor = executor_loader.ExecutorLoader.get_default_executor()
assert executor is not None
assert executor_name == executor.__class__.__name__
assert executor.name is not None
- assert executor.name == ExecutorName(ExecutorLoader.executors[executor_name], alias=executor_name)
+ assert executor.name == ExecutorName(
+ executor_loader.ExecutorLoader.executors[executor_name], alias=executor_name
+ )
assert executor.name.connector_source == ConnectorSource.CORE
@mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
@mock.patch("airflow.plugins_manager.executors_modules", None)
def test_should_support_plugins(self):
with conf_vars({("core", "executor"): f"{TEST_PLUGIN_NAME}.FakeExecutor"}):
- executor = ExecutorLoader.get_default_executor()
+ executor = executor_loader.ExecutorLoader.get_default_executor()
assert executor is not None
assert "FakeExecutor" == executor.__class__.__name__
assert executor.name is not None
@@ -101,7 +90,7 @@ def test_should_support_plugins(self):
def test_should_support_custom_path(self):
with conf_vars({("core", "executor"): "tests.executors.test_executor_loader.FakeExecutor"}):
- executor = ExecutorLoader.get_default_executor()
+ executor = executor_loader.ExecutorLoader.get_default_executor()
assert executor is not None
assert "FakeExecutor" == executor.__class__.__name__
assert executor.name is not None
@@ -172,17 +161,17 @@ def test_should_support_custom_path(self):
)
def test_get_hybrid_executors_from_config(self, executor_config, expected_executors_list):
with conf_vars({("core", "executor"): executor_config}):
- executors = ExecutorLoader._get_executor_names()
+ executors = executor_loader.ExecutorLoader._get_executor_names()
assert executors == expected_executors_list
def test_init_executors(self):
with conf_vars({("core", "executor"): "CeleryExecutor"}):
- executors = ExecutorLoader.init_executors()
- executor_name = ExecutorLoader.get_default_executor_name()
+ executors = executor_loader.ExecutorLoader.init_executors()
+ executor_name = executor_loader.ExecutorLoader.get_default_executor_name()
assert len(executors) == 1
assert isinstance(executors[0], CeleryExecutor)
- assert "CeleryExecutor" in ExecutorLoader.executors
- assert ExecutorLoader.executors["CeleryExecutor"] == executor_name.module_path
+ assert "CeleryExecutor" in executor_loader.ExecutorLoader.executors
+ assert executor_loader.ExecutorLoader.executors["CeleryExecutor"] == executor_name.module_path
assert isinstance(executor_loader._loaded_executors[executor_name], CeleryExecutor)
@pytest.mark.parametrize(
@@ -202,7 +191,7 @@ def test_get_hybrid_executors_from_config_duplicates_should_fail(self, executor_
with pytest.raises(
AirflowConfigException, match=r".+Duplicate executors are not yet supported.+"
):
- ExecutorLoader._get_executor_names()
+ executor_loader.ExecutorLoader._get_executor_names()
@pytest.mark.parametrize(
"executor_config",
@@ -218,7 +207,7 @@ def test_get_hybrid_executors_from_config_duplicates_should_fail(self, executor_
def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self, executor_config):
with conf_vars({("core", "executor"): executor_config}):
with pytest.raises(AirflowConfigException):
- ExecutorLoader._get_executor_names()
+ executor_loader.ExecutorLoader._get_executor_names()
@pytest.mark.parametrize(
("executor_config", "expected_value"),
@@ -234,7 +223,7 @@ def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self,
)
def test_should_support_import_executor_from_core(self, executor_config, expected_value):
with conf_vars({("core", "executor"): executor_config}):
- executor, import_source = ExecutorLoader.import_default_executor_cls()
+ executor, import_source = executor_loader.ExecutorLoader.import_default_executor_cls()
assert expected_value == executor.__name__
assert import_source == ConnectorSource.CORE
@@ -249,7 +238,7 @@ def test_should_support_import_executor_from_core(self, executor_config, expecte
)
def test_should_support_import_plugins(self, executor_config):
with conf_vars({("core", "executor"): executor_config}):
- executor, import_source = ExecutorLoader.import_default_executor_cls()
+ executor, import_source = executor_loader.ExecutorLoader.import_default_executor_cls()
assert "FakeExecutor" == executor.__name__
assert import_source == ConnectorSource.PLUGIN
@@ -263,7 +252,7 @@ def test_should_support_import_plugins(self, executor_config):
)
def test_should_support_import_custom_path(self, executor_config):
with conf_vars({("core", "executor"): executor_config}):
- executor, import_source = ExecutorLoader.import_default_executor_cls()
+ executor, import_source = executor_loader.ExecutorLoader.import_default_executor_cls()
assert "FakeExecutor" == executor.__name__
assert import_source == ConnectorSource.CUSTOM_PATH
@@ -272,7 +261,7 @@ def test_should_support_import_custom_path(self, executor_config):
@pytest.mark.parametrize("executor", [FakeExecutor, FakeSingleThreadedExecutor])
def test_validate_database_executor_compatibility_general(self, monkeypatch, executor):
monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK")
- ExecutorLoader.validate_database_executor_compatibility(executor)
+ executor_loader.ExecutorLoader.validate_database_executor_compatibility(executor)
@pytest.mark.db_test
@pytest.mark.backend("sqlite")
@@ -290,24 +279,32 @@ def test_validate_database_executor_compatibility_general(self, monkeypatch, exe
def test_validate_database_executor_compatibility_sqlite(self, monkeypatch, executor, expectation):
monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK")
with expectation:
- ExecutorLoader.validate_database_executor_compatibility(executor)
+ executor_loader.ExecutorLoader.validate_database_executor_compatibility(executor)
def test_load_executor(self):
with conf_vars({("core", "executor"): "LocalExecutor"}):
- ExecutorLoader.init_executors()
- assert isinstance(ExecutorLoader.load_executor("LocalExecutor"), LocalExecutor)
- assert isinstance(ExecutorLoader.load_executor(executor_loader._executor_names[0]), LocalExecutor)
- assert isinstance(ExecutorLoader.load_executor(None), LocalExecutor)
+ executor_loader.ExecutorLoader.init_executors()
+ assert isinstance(executor_loader.ExecutorLoader.load_executor("LocalExecutor"), LocalExecutor)
+ assert isinstance(
+ executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
+ LocalExecutor,
+ )
+ assert isinstance(executor_loader.ExecutorLoader.load_executor(None), LocalExecutor)
def test_load_executor_alias(self):
with conf_vars({("core", "executor"): "local_exec:airflow.executors.local_executor.LocalExecutor"}):
- ExecutorLoader.init_executors()
- assert isinstance(ExecutorLoader.load_executor("local_exec"), LocalExecutor)
+ executor_loader.ExecutorLoader.init_executors()
+ assert isinstance(executor_loader.ExecutorLoader.load_executor("local_exec"), LocalExecutor)
+ assert isinstance(
+ executor_loader.ExecutorLoader.load_executor(
+ "airflow.executors.local_executor.LocalExecutor"
+ ),
+ LocalExecutor,
+ )
assert isinstance(
- ExecutorLoader.load_executor("airflow.executors.local_executor.LocalExecutor"),
+ executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
LocalExecutor,
)
- assert isinstance(ExecutorLoader.load_executor(executor_loader._executor_names[0]), LocalExecutor)
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor", autospec=True)
def test_load_custom_executor_with_classname(self, mock_executor):
@@ -319,15 +316,16 @@ def test_load_custom_executor_with_classname(self, mock_executor):
): "my_alias:airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
}
):
- ExecutorLoader.init_executors()
- assert isinstance(ExecutorLoader.load_executor("my_alias"), AwsEcsExecutor)
- assert isinstance(ExecutorLoader.load_executor("AwsEcsExecutor"), AwsEcsExecutor)
+ executor_loader.ExecutorLoader.init_executors()
+ assert isinstance(executor_loader.ExecutorLoader.load_executor("my_alias"), AwsEcsExecutor)
+ assert isinstance(executor_loader.ExecutorLoader.load_executor("AwsEcsExecutor"), AwsEcsExecutor)
assert isinstance(
- ExecutorLoader.load_executor(
+ executor_loader.ExecutorLoader.load_executor(
"airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
),
AwsEcsExecutor,
)
assert isinstance(
- ExecutorLoader.load_executor(executor_loader._executor_names[0]), AwsEcsExecutor
+ executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
+ AwsEcsExecutor,
)
diff --git a/tests/models/test_baseoperatormeta.py b/tests/models/test_baseoperatormeta.py
index 5244e86b2c386..52e45dd1cf325 100644
--- a/tests/models/test_baseoperatormeta.py
+++ b/tests/models/test_baseoperatormeta.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import datetime
+import threading
from typing import TYPE_CHECKING, Any
from unittest.mock import patch
@@ -211,3 +212,20 @@ def say_hello(**context):
mock_log.warning.assert_called_once_with(
"HelloWorldOperator.execute cannot be called outside TaskInstance!"
)
+
+ def test_thread_local_executor_safeguard(self):
+ class TestExecutorSafeguardThread(threading.Thread):
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self.executor_safeguard = ExecutorSafeguard()
+
+ def run(self):
+ class Wrapper:
+ def wrapper_test_func(self, *args, **kwargs):
+ print("test")
+
+ wrap_func = self.executor_safeguard.decorator(Wrapper.wrapper_test_func)
+ wrap_func(Wrapper(), Wrapper__sentinel="abc")
+
+ # Test thread local caller value is set properly
+ TestExecutorSafeguardThread().start()
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f23eac76b6e93..5f721b61d2691 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -28,7 +28,6 @@
import weakref
from contextlib import redirect_stdout
from datetime import timedelta
-from importlib import reload
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING
@@ -56,7 +55,6 @@
RemovedInAirflow3Warning,
UnknownExecutorException,
)
-from airflow.executors import executor_loader
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.models.baseoperator import BaseOperator
@@ -3324,10 +3322,10 @@ def test_dataset_expression(self, session: Session) -> None:
]
}
+ @pytest.mark.usefixtures("clean_executor_loader")
@mock.patch("airflow.models.dag.run_job")
def test_dag_executors(self, run_job_mock):
dag = DAG(dag_id="test", schedule=None)
- reload(executor_loader)
with conf_vars({("core", "executor"): "SequentialExecutor"}):
dag.run()
assert isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, SequentialExecutor)
diff --git a/tests/plugins/test_plugins_manager.py b/tests/plugins/test_plugins_manager.py
index 2426352fc8531..2e7ddd9bac848 100644
--- a/tests/plugins/test_plugins_manager.py
+++ b/tests/plugins/test_plugins_manager.py
@@ -28,6 +28,7 @@
import pytest
+from airflow.exceptions import RemovedInAirflow3Warning
from airflow.hooks.base import BaseHook
from airflow.listeners.listener import get_listener_manager
from airflow.plugins_manager import AirflowPlugin
@@ -174,6 +175,11 @@ def clean_plugins(self):
plugins_manager.loaded_plugins = set()
plugins_manager.plugins = []
+ yield
+ plugins_manager.loaded_plugins = set()
+
+ plugins_manager.registered_ti_dep_classes = None
+ plugins_manager.plugins = None
def test_no_log_when_no_plugins(self, caplog):
with mock_plugin_manager(plugins=[]):
@@ -270,6 +276,17 @@ class AirflowAdminMenuLinksPlugin(AirflowPlugin):
),
]
+ def test_deprecate_ti_deps(self):
+ class DeprecatedTIDeps(AirflowPlugin):
+ name = "ti_deps"
+
+ ti_deps = [mock.MagicMock()]
+
+ with mock_plugin_manager(plugins=[DeprecatedTIDeps()]), pytest.warns(RemovedInAirflow3Warning):
+ from airflow import plugins_manager
+
+ plugins_manager.initialize_ti_deps_plugins()
+
def test_should_not_warning_about_fab_plugins(self, caplog):
class AirflowAdminViewsPlugin(AirflowPlugin):
name = "test_admin_views_plugin"
diff --git a/tests/providers/amazon/aws/system/utils/test_helpers.py b/tests/providers/amazon/aws/system/utils/test_helpers.py
index f48de1788b74c..3af3720688a09 100644
--- a/tests/providers/amazon/aws/system/utils/test_helpers.py
+++ b/tests/providers/amazon/aws/system/utils/test_helpers.py
@@ -24,7 +24,7 @@
import os
import sys
from io import StringIO
-from unittest.mock import ANY, patch
+from unittest.mock import patch
import pytest
from moto import mock_aws
@@ -79,8 +79,15 @@ def test_fetch_variable_success(
) -> None:
mock_getenv.return_value = env_value or ssm_value
- result = utils.fetch_variable(ANY, default_value) if default_value else utils.fetch_variable(ANY_STR)
+ utils._fetch_from_ssm.cache_clear()
+ result = (
+ utils.fetch_variable("some_key", default_value)
+ if default_value
+ else utils.fetch_variable(ANY_STR)
+ )
+
+ utils._fetch_from_ssm.cache_clear()
assert result == expected_result
def test_fetch_variable_no_value_found_raises_exception(self):
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index d7f09c20ff9d3..58f16d80f8c89 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -408,6 +408,21 @@ def timetable_plugin(monkeypatch):
)
+@pytest.fixture
+def custom_ti_dep(monkeypatch):
+ """Patch plugins manager to always and only return our custom timetable."""
+ from test_plugin import CustomTestTriggerRule
+
+ from airflow import plugins_manager
+
+ monkeypatch.setattr(plugins_manager, "initialize_ti_deps_plugins", lambda: None)
+ monkeypatch.setattr(
+ plugins_manager,
+ "registered_ti_dep_classes",
+ {"test_plugin.CustomTestTriggerRule": CustomTestTriggerRule},
+ )
+
+
# TODO: (potiuk) - AIP-44 - check why this test hangs
@pytest.mark.skip_if_database_isolation_mode
class TestStringifiedDAGs:
@@ -430,6 +445,7 @@ def setup_test_cases(self):
)
@pytest.mark.db_test
+ @pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_serialization(self):
"""Serialization and deserialization should work for every DAG and Operator."""
dags = collect_dags()
@@ -539,6 +555,7 @@ def sorted_serialized_dag(dag_dict: dict):
return actual, expected
@pytest.mark.db_test
+ @pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_deserialization_across_process(self):
"""A serialized DAG can be deserialized in another process."""
@@ -1596,6 +1613,7 @@ def test_deps_sorted(self):
"airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep",
]
+ @pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_error_on_unregistered_ti_dep_serialization(self):
# trigger rule not registered through the plugin system will not be serialized
class DummyTriggerRule(BaseTIDep):
@@ -1634,6 +1652,8 @@ def test_error_on_unregistered_ti_dep_deserialization(self):
SerializedBaseOperator.deserialize_operator(serialize_op)
@pytest.mark.db_test
+ @pytest.mark.usefixtures("custom_ti_dep")
+ @pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_serialize_and_deserialize_custom_ti_deps(self):
from test_plugin import CustomTestTriggerRule
diff --git a/tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py b/tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py
index fcebc8c40a0d4..2b7bce2fecde8 100644
--- a/tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py
+++ b/tests/system/providers/amazon/aws/example_bedrock_retrieve_and_generate.py
@@ -127,7 +127,7 @@ def create_opensearch_policies(bedrock_role_arn: str, collection_name: str, poli
def _create_security_policy(name, policy_type, policy):
try:
- aoss_client.create_security_policy(name=name, policy=json.dumps(policy), type=policy_type)
+ aoss_client.conn.create_security_policy(name=name, policy=json.dumps(policy), type=policy_type)
except ClientError as e:
if e.response["Error"]["Code"] == "ConflictException":
log.info("OpenSearch security policy %s already exists.", name)
@@ -135,7 +135,7 @@ def _create_security_policy(name, policy_type, policy):
def _create_access_policy(name, policy_type, policy):
try:
- aoss_client.create_access_policy(name=name, policy=json.dumps(policy), type=policy_type)
+ aoss_client.conn.create_access_policy(name=name, policy=json.dumps(policy), type=policy_type)
except ClientError as e:
if e.response["Error"]["Code"] == "ConflictException":
log.info("OpenSearch data access policy %s already exists.", name)
@@ -204,9 +204,9 @@ def create_collection(collection_name: str):
:param collection_name: The name of the Collection to create.
"""
log.info("\nCreating collection: %s.", collection_name)
- return aoss_client.create_collection(name=collection_name, type="VECTORSEARCH")["createCollectionDetail"][
- "id"
- ]
+ return aoss_client.conn.create_collection(name=collection_name, type="VECTORSEARCH")[
+ "createCollectionDetail"
+ ]["id"]
@task
@@ -317,7 +317,7 @@ def get_collection_arn(collection_id: str):
"""
return next(
colxn["arn"]
- for colxn in aoss_client.list_collections()["collectionSummaries"]
+ for colxn in aoss_client.conn.list_collections()["collectionSummaries"]
if colxn["id"] == collection_id
)
@@ -336,7 +336,9 @@ def delete_data_source(knowledge_base_id: str, data_source_id: str):
:param data_source_id: The unique identifier of the data source to delete.
"""
log.info("Deleting data source %s from Knowledge Base %s.", data_source_id, knowledge_base_id)
- bedrock_agent_client.delete_data_source(dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id)
+ bedrock_agent_client.conn.delete_data_source(
+ dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id
+ )
# [END howto_operator_bedrock_delete_data_source]
@@ -355,7 +357,7 @@ def delete_knowledge_base(knowledge_base_id: str):
:param knowledge_base_id: The unique identifier of the knowledge base to delete.
"""
log.info("Deleting Knowledge Base %s.", knowledge_base_id)
- bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
+ bedrock_agent_client.conn.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
# [END howto_operator_bedrock_delete_knowledge_base]
@@ -393,7 +395,7 @@ def delete_collection(collection_id: str):
:param collection_id: ID of the collection to be indexed.
"""
log.info("Deleting collection %s.", collection_id)
- aoss_client.delete_collection(id=collection_id)
+ aoss_client.conn.delete_collection(id=collection_id)
@task(trigger_rule=TriggerRule.ALL_DONE)
@@ -404,7 +406,7 @@ def delete_opensearch_policies(collection_name: str):
:param collection_name: All policies in the given collection name will be deleted.
"""
- access_policies = aoss_client.list_access_policies(
+ access_policies = aoss_client.conn.list_access_policies(
type="data", resource=[f"collection/{collection_name}"]
)["accessPolicySummaries"]
log.info("Found access policies for %s: %s", collection_name, access_policies)
@@ -412,10 +414,10 @@ def delete_opensearch_policies(collection_name: str):
raise Exception("No access policies found?")
for policy in access_policies:
log.info("Deleting access policy for %s: %s", collection_name, policy["name"])
- aoss_client.delete_access_policy(name=policy["name"], type="data")
+ aoss_client.conn.delete_access_policy(name=policy["name"], type="data")
for policy_type in ["encryption", "network"]:
- policies = aoss_client.list_security_policies(
+ policies = aoss_client.conn.list_security_policies(
type=policy_type, resource=[f"collection/{collection_name}"]
)["securityPolicySummaries"]
if not policies:
@@ -423,7 +425,7 @@ def delete_opensearch_policies(collection_name: str):
log.info("Found %s security policies for %s: %s", policy_type, collection_name, policies)
for policy in policies:
log.info("Deleting %s security policy for %s: %s", policy_type, collection_name, policy["name"])
- aoss_client.delete_security_policy(name=policy["name"], type=policy_type)
+ aoss_client.conn.delete_security_policy(name=policy["name"], type=policy_type)
with DAG(
@@ -436,8 +438,8 @@ def delete_opensearch_policies(collection_name: str):
test_context = sys_test_context_task()
env_id = test_context["ENV_ID"]
- aoss_client = OpenSearchServerlessHook(aws_conn_id=None).conn
- bedrock_agent_client = BedrockAgentHook(aws_conn_id=None).conn
+ aoss_client = OpenSearchServerlessHook(aws_conn_id=None)
+ bedrock_agent_client = BedrockAgentHook(aws_conn_id=None)
region_name = boto3.session.Session().region_name
diff --git a/tests/system/providers/amazon/aws/utils/__init__.py b/tests/system/providers/amazon/aws/utils/__init__.py
index 8b4114fc90ad0..411f92ab7bf3a 100644
--- a/tests/system/providers/amazon/aws/utils/__init__.py
+++ b/tests/system/providers/amazon/aws/utils/__init__.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import functools
import inspect
import json
import logging
@@ -92,6 +93,7 @@ def _validate_env_id(env_id: str) -> str:
return env_id.lower()
+@functools.cache
def _fetch_from_ssm(key: str, test_name: str | None = None) -> str:
"""
Test values are stored in the SSM Value as a JSON-encoded dict of key/value pairs.
diff --git a/tests/test_utils/executor_loader.py b/tests/test_utils/executor_loader.py
new file mode 100644
index 0000000000000..cc28223b7ce78
--- /dev/null
+++ b/tests/test_utils/executor_loader.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import airflow.executors.executor_loader as executor_loader
+
+if TYPE_CHECKING:
+ from airflow.executors.executor_utils import ExecutorName
+
+
+def clean_executor_loader_module():
+ """Clean the executor_loader state, as it stores global variables in the module, causing side effects for some tests."""
+ executor_loader._alias_to_executors: dict[str, ExecutorName] = {}
+ executor_loader._module_to_executors: dict[str, ExecutorName] = {}
+ executor_loader._classname_to_executors: dict[str, ExecutorName] = {}
+ executor_loader._executor_names: list[ExecutorName] = []
diff --git a/tests/test_utils/mock_operators.py b/tests/test_utils/mock_operators.py
index cd816707a59f5..f254d22484c0f 100644
--- a/tests/test_utils/mock_operators.py
+++ b/tests/test_utils/mock_operators.py
@@ -22,6 +22,7 @@
import attr
from airflow.models.baseoperator import BaseOperator
+from airflow.models.mappedoperator import MappedOperator
from airflow.models.xcom import XCom
from tests.test_utils.compat import BaseOperatorLink
@@ -137,7 +138,11 @@ class CustomOpLink(BaseOperatorLink):
def get_link(self, operator, *, ti_key):
search_query = XCom.get_one(
- task_id=ti_key.task_id, dag_id=ti_key.dag_id, run_id=ti_key.run_id, key="search_query"
+ task_id=ti_key.task_id,
+ dag_id=ti_key.dag_id,
+ run_id=ti_key.run_id,
+ map_index=ti_key.map_index,
+ key="search_query",
)
if not search_query:
return None
@@ -153,7 +158,11 @@ def operator_extra_links(self):
"""
Return operator extra links
"""
- if isinstance(self.bash_command, str) or self.bash_command is None:
+ if (
+ isinstance(self, MappedOperator)
+ or isinstance(self.bash_command, str)
+ or self.bash_command is None
+ ):
return (CustomOpLink(),)
return (CustomBaseIndexOpLink(i) for i, _ in enumerate(self.bash_command))
diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
index 568d6abf025c7..9241145f7f532 100644
--- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -48,6 +48,7 @@ def side_effect(*args, **kwargs):
yield m
+@pytest.mark.usefixtures("clean_executor_loader")
class TestNotInReschedulePeriodDep:
@pytest.fixture(autouse=True)
def setup_test_cases(self, request, create_task_instance):
diff --git a/tests/utils/test_decorators.py b/tests/utils/test_decorators.py
new file mode 100644
index 0000000000000..19d3ec31d0311
--- /dev/null
+++ b/tests/utils/test_decorators.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import pytest
+
+from airflow.decorators import task
+
+if TYPE_CHECKING:
+ from airflow.decorators.base import Task, TaskDecorator
+
+_CONDITION_DECORATORS = frozenset({"skip_if", "run_if"})
+_NO_SOURCE_DECORATORS = frozenset({"sensor"})
+DECORATORS = sorted(
+ set(x for x in dir(task) if not x.startswith("_")) - _CONDITION_DECORATORS - _NO_SOURCE_DECORATORS
+)
+DECORATORS_USING_SOURCE = ("external_python", "virtualenv", "branch_virtualenv", "branch_external_python")
+
+
+@pytest.fixture
+def decorator(request: pytest.FixtureRequest) -> TaskDecorator:
+ decorator_factory = getattr(task, request.param)
+
+ kwargs = {}
+ if "external" in request.param:
+ kwargs["python"] = "python3"
+ return decorator_factory(**kwargs)
+
+
+@pytest.mark.parametrize("decorator", DECORATORS_USING_SOURCE, indirect=["decorator"])
+def test_task_decorator_using_source(decorator: TaskDecorator):
+ @decorator
+ def f():
+ return ["some_task"]
+
+ assert parse_python_source(f, "decorator") == 'def f():\n return ["some_task"]\n'
+
+
+@pytest.mark.parametrize("decorator", DECORATORS, indirect=["decorator"])
+def test_skip_if(decorator: TaskDecorator):
+ @task.skip_if(lambda context: True)
+ @decorator
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f, "decorator") == 'def f():\n return "hello world"\n'
+
+
+@pytest.mark.parametrize("decorator", DECORATORS, indirect=["decorator"])
+def test_run_if(decorator: TaskDecorator):
+ @task.run_if(lambda context: True)
+ @decorator
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f, "decorator") == 'def f():\n return "hello world"\n'
+
+
+def test_skip_if_and_run_if():
+ @task.skip_if(lambda context: True)
+ @task.run_if(lambda context: True)
+ @task.virtualenv()
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f) == 'def f():\n return "hello world"\n'
+
+
+def test_run_if_and_skip_if():
+ @task.run_if(lambda context: True)
+ @task.skip_if(lambda context: True)
+ @task.virtualenv()
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f) == 'def f():\n return "hello world"\n'
+
+
+def test_skip_if_allow_decorator():
+ def non_task_decorator(func):
+ return func
+
+ @task.skip_if(lambda context: True)
+ @task.virtualenv()
+ @non_task_decorator
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f) == '@non_task_decorator\ndef f():\n return "hello world"\n'
+
+
+def test_run_if_allow_decorator():
+ def non_task_decorator(func):
+ return func
+
+ @task.run_if(lambda context: True)
+ @task.virtualenv()
+ @non_task_decorator
+ def f():
+ return "hello world"
+
+ assert parse_python_source(f) == '@non_task_decorator\ndef f():\n return "hello world"\n'
+
+
+def parse_python_source(task: Task, custom_operator_name: str | None = None) -> str:
+ operator = task().operator
+ if custom_operator_name:
+ custom_operator_name = (
+ custom_operator_name if custom_operator_name.startswith("@") else f"@{custom_operator_name}"
+ )
+ operator.__dict__["custom_operator_name"] = custom_operator_name
+ return operator.get_python_source()
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index d3651370d657e..95483f2285fa8 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -34,7 +34,7 @@
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.exceptions import RemovedInAirflow3Warning
-from airflow.executors import executor_loader
+from airflow.executors import executor_constants, executor_loader
from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.models.dag import DAG
@@ -202,6 +202,95 @@ def task_callable(ti):
# Remove the generated tmp log file.
os.remove(log_filename)
+ @pytest.mark.parametrize(
+ "executor_name",
+ [
+ (executor_constants.LOCAL_KUBERNETES_EXECUTOR),
+ (executor_constants.CELERY_KUBERNETES_EXECUTOR),
+ (executor_constants.KUBERNETES_EXECUTOR),
+ (None),
+ ],
+ )
+ @conf_vars(
+ {
+ ("core", "EXECUTOR"): ",".join(
+ [
+ executor_constants.LOCAL_KUBERNETES_EXECUTOR,
+ executor_constants.CELERY_KUBERNETES_EXECUTOR,
+ executor_constants.KUBERNETES_EXECUTOR,
+ ]
+ ),
+ }
+ )
+ @patch(
+ "airflow.executors.executor_loader.ExecutorLoader.load_executor",
+ wraps=executor_loader.ExecutorLoader.load_executor,
+ )
+ @patch(
+ "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
+ wraps=executor_loader.ExecutorLoader.get_default_executor,
+ )
+ def test_file_task_handler_with_multiple_executors(
+ self,
+ mock_get_default_executor,
+ mock_load_executor,
+ executor_name,
+ create_task_instance,
+ clean_executor_loader,
+ ):
+ executors_mapping = executor_loader.ExecutorLoader.executors
+ default_executor_name = executor_loader.ExecutorLoader.get_default_executor_name()
+ path_to_executor_class: str
+ if executor_name is None:
+ path_to_executor_class = executors_mapping.get(default_executor_name.alias)
+ else:
+ path_to_executor_class = executors_mapping.get(executor_name)
+
+ with patch(f"{path_to_executor_class}.get_task_log", return_value=([], [])) as mock_get_task_log:
+ mock_get_task_log.return_value = ([], [])
+ ti = create_task_instance(
+ dag_id="dag_for_testing_multiple_executors",
+ task_id="task_for_testing_multiple_executors",
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ )
+ if executor_name is not None:
+ ti.executor = executor_name
+ ti.try_number = 1
+ ti.state = TaskInstanceState.RUNNING
+ logger = ti.log
+ ti.log.disabled = False
+
+ file_handler = next(
+ (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
+ )
+ assert file_handler is not None
+
+ set_context(logger, ti)
+ # clear executor_instances cache
+ file_handler.executor_instances = {}
+ assert file_handler.handler is not None
+ # We expect set_context generates a file locally.
+ log_filename = file_handler.handler.baseFilename
+ assert os.path.isfile(log_filename)
+ assert log_filename.endswith("1.log"), log_filename
+
+ file_handler.flush()
+ file_handler.close()
+
+ assert hasattr(file_handler, "read")
+ file_handler.read(ti)
+ os.remove(log_filename)
+ mock_get_task_log.assert_called_once()
+
+ if executor_name is None:
+ mock_get_default_executor.assert_called_once()
+ # will be called in `ExecutorLoader.get_default_executor` method
+ mock_load_executor.assert_called_once_with(default_executor_name)
+ else:
+ mock_get_default_executor.assert_not_called()
+ mock_load_executor.assert_called_once_with(executor_name)
+
def test_file_task_handler_running(self):
def task_callable(ti):
ti.log.info("test")
@@ -296,6 +385,7 @@ def test__read_from_local(self, tmp_path):
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.get_task_log"
)
+ @pytest.mark.usefixtures("clean_executor_loader")
@pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS])
def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instance, state):
"""Test for k8s executor, the log is read from get_task_log method"""
@@ -309,6 +399,7 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc
)
ti.state = state
ti.triggerer_job = None
+ ti.executor = executor_name
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")
@@ -401,11 +492,12 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="pod-name-xxx")), "default"),
],
)
- @patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor")
+ @conf_vars({("core", "executor"): "KubernetesExecutor"})
@patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_read_from_k8s_under_multi_namespace_mode(
self, mock_kube_client, pod_override, namespace_to_call
):
+ reload(executor_loader)
mock_read_log = mock_kube_client.return_value.read_namespaced_pod_log
mock_list_pod = mock_kube_client.return_value.list_namespaced_pod
@@ -426,6 +518,7 @@ def task_callable(ti):
)
ti = TaskInstance(task=task, run_id=dagrun.run_id)
ti.try_number = 3
+ ti.executor = "KubernetesExecutor"
logger = ti.log
ti.log.disabled = False
@@ -434,6 +527,8 @@ def task_callable(ti):
set_context(logger, ti)
ti.run(ignore_ti_state=True)
ti.state = TaskInstanceState.RUNNING
+ # clear executor_instances cache
+ file_handler.executor_instances = {}
file_handler.read(ti, 2)
# first we find pod name
diff --git a/tests/utils/test_usage_data_collection.py b/tests/utils/test_usage_data_collection.py
deleted file mode 100644
index 143bce39eca4d..0000000000000
--- a/tests/utils/test_usage_data_collection.py
+++ /dev/null
@@ -1,104 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import platform
-from unittest import mock
-
-import pytest
-
-from airflow import __version__ as airflow_version
-from airflow.configuration import conf
-from airflow.utils.usage_data_collection import (
- get_database_version,
- get_python_version,
- usage_data_collection,
-)
-
-
-@pytest.mark.parametrize("is_enabled, is_prerelease", [(False, True), (True, True)])
-@mock.patch("httpx.get")
-def test_scarf_analytics_disabled(mock_get, is_enabled, is_prerelease):
- with mock.patch("airflow.settings.is_usage_data_collection_enabled", return_value=is_enabled), mock.patch(
- "airflow.utils.usage_data_collection._version_is_prerelease", return_value=is_prerelease
- ):
- usage_data_collection()
- mock_get.assert_not_called()
-
-
-@mock.patch("airflow.settings.is_usage_data_collection_enabled", return_value=True)
-@mock.patch("airflow.utils.usage_data_collection._version_is_prerelease", return_value=False)
-@mock.patch("airflow.utils.usage_data_collection._is_ci_environ", return_value=False)
-@mock.patch("airflow.utils.usage_data_collection.get_database_version", return_value="12.3")
-@mock.patch("airflow.utils.usage_data_collection.get_database_name", return_value="postgres")
-@mock.patch("httpx.get")
-def test_scarf_analytics(
- mock_get,
- mock_is_usage_data_collection_enabled,
- mock_version_is_ci,
- mock_version_is_prerelease,
- get_database_version,
- get_database_name,
-):
- platform_sys = platform.system()
- platform_machine = platform.machine()
- python_version = get_python_version()
- executor = conf.get("core", "EXECUTOR")
- scarf_endpoint = "https://apacheairflow.gateway.scarf.sh/scheduler"
- usage_data_collection()
-
- expected_scarf_url = (
- f"{scarf_endpoint}?version={airflow_version}"
- f"&python_version={python_version}"
- f"&platform={platform_sys}"
- f"&arch={platform_machine}"
- f"&database=postgres"
- f"&db_version=12.3"
- f"&executor={executor}"
- )
-
- mock_get.assert_called_once_with(expected_scarf_url, timeout=5.0)
-
-
-@pytest.mark.skip_if_database_isolation_mode
-@pytest.mark.db_test
-@pytest.mark.parametrize(
- "version_info, expected_version",
- [
- ((1, 2, 3), "1.2"), # Normal version tuple
- (None, "None"), # No version info available
- ((1,), "1"), # Single element version tuple
- ((1, 2, 3, "beta", 4), "1.2"), # Complex version tuple with strings
- ],
-)
-def test_get_database_version(version_info, expected_version):
- with mock.patch("airflow.settings.engine.dialect.server_version_info", new=version_info):
- assert get_database_version() == expected_version
-
-
-@pytest.mark.parametrize(
- "version_info, expected_version",
- [
- ("1.2.3", "1.2"), # Normal version
- ("4", "4"), # Single element version
- ("1.2.3.beta4", "1.2"), # Complex version tuple with strings
- ],
-)
-def test_get_python_version(version_info, expected_version):
- with mock.patch("platform.python_version", return_value=version_info):
- assert get_python_version() == expected_version