From 073d47b7652dcd408bf6739652153fa7119e9361 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 6 Jul 2025 19:35:21 +0200 Subject: [PATCH 1/3] Ensure Edge Plugin for API endpoint is only loaded on API-Server and AF2 Webserver --- .../providers/edge3/plugins/edge_executor_plugin.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py index 9c8b32eadea33..e5c3481ae8ff5 100644 --- a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py +++ b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py @@ -17,6 +17,7 @@ from __future__ import annotations +import sys from typing import TYPE_CHECKING, Any from airflow.configuration import conf @@ -213,12 +214,18 @@ def change_maintenance_comment(self, worker_name: str): except AirflowConfigException: EDGE_EXECUTOR_ACTIVE = False +# Load the API endpoint only on api-server (Airflow 3.x) or webserver (Airflow 2.x) +if AIRFLOW_V_3_0_PLUS: + RUNNING_ON_APISERVER = sys.argv[1] in ["api-server"] if len(sys.argv) > 1 else False +else: + RUNNING_ON_APISERVER = "gunicorn" in sys.argv[0] and "airflow-webserver" in sys.argv + class EdgeExecutorPlugin(AirflowPlugin): """EdgeExecutor Plugin - provides API endpoints for Edge Workers in Webserver.""" name = "edge_executor" - if EDGE_EXECUTOR_ACTIVE: + if EDGE_EXECUTOR_ACTIVE and RUNNING_ON_APISERVER: if AIRFLOW_V_3_0_PLUS: fastapi_apps = [_get_api_endpoint()] else: From 2266fef982a3e65aeef16ae464b800c165facc17 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 6 Jul 2025 20:25:59 +0200 Subject: [PATCH 2/3] Fix and extend pytests for edge3 plugin --- .../plugins/test_edge_executor_plugin.py | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py index 99e5e24722e77..c5a4c6782f910 100644 --- a/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py +++ b/providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py @@ -17,6 +17,7 @@ from __future__ import annotations import importlib +from unittest.mock import patch import pytest import time_machine @@ -44,17 +45,20 @@ def test_plugin_inactive(): @pytest.mark.db_test -def test_plugin_active(): - with conf_vars({("edge", "api_enabled"): "true"}): +def test_plugin_active_apiserver(): + mock_cli = ["airflow", "api-server"] if AIRFLOW_V_3_0_PLUS else ["gunicorn", "airflow-webserver"] + with conf_vars({("edge", "api_enabled"): "true"}), patch("sys.argv", mock_cli): importlib.reload(edge_executor_plugin) from airflow.providers.edge3.plugins.edge_executor_plugin import ( EDGE_EXECUTOR_ACTIVE, + RUNNING_ON_APISERVER, EdgeExecutorPlugin, ) rep = EdgeExecutorPlugin() assert EDGE_EXECUTOR_ACTIVE + assert RUNNING_ON_APISERVER if AIRFLOW_V_3_0_PLUS: assert len(rep.appbuilder_views) == 0 assert len(rep.flask_blueprints) == 0 @@ -64,6 +68,27 @@ def test_plugin_active(): assert len(rep.flask_blueprints) == 2 +@patch("sys.argv", ["airflow", "some-other-command"]) +def test_plugin_active_non_apiserver(): + with conf_vars({("edge", "api_enabled"): "true"}): + importlib.reload(edge_executor_plugin) + + from airflow.providers.edge3.plugins.edge_executor_plugin import ( + EDGE_EXECUTOR_ACTIVE, + RUNNING_ON_APISERVER, + EdgeExecutorPlugin, + ) + + rep = EdgeExecutorPlugin() + assert EDGE_EXECUTOR_ACTIVE + assert not RUNNING_ON_APISERVER + assert len(rep.appbuilder_views) == 0 + assert len(rep.flask_blueprints) == 0 + assert len(rep.appbuilder_views) == 0 + if AIRFLOW_V_3_0_PLUS: + assert len(rep.fastapi_apps) == 0 + + @pytest.fixture def plugin(): from airflow.providers.edge3.plugins.edge_executor_plugin import EdgeExecutorPlugin From 67791b72279c20ecfaa4174f62f4592f093ebbb1 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 13 Jul 2025 16:32:52 +0200 Subject: [PATCH 3/3] Add a note to clean bad init code up --- .../airflow/providers/edge3/plugins/edge_executor_plugin.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py index e5c3481ae8ff5..bbae741062384 100644 --- a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py +++ b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py @@ -215,6 +215,10 @@ def change_maintenance_comment(self, worker_name: str): EDGE_EXECUTOR_ACTIVE = False # Load the API endpoint only on api-server (Airflow 3.x) or webserver (Airflow 2.x) +# todo(jscheffl): Remove this check when the discussion in +# https://lists.apache.org/thread/w170czq6r7bslkqp1tk6bjjjo0789wgl +# resulted in a proper API to selective initialize. Maybe backcompat-shim +# is also needed to support Airflow-versions prior the rework. if AIRFLOW_V_3_0_PLUS: RUNNING_ON_APISERVER = sys.argv[1] in ["api-server"] if len(sys.argv) > 1 else False else: