Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

import sys
from typing import TYPE_CHECKING, Any

from airflow.configuration import conf
Expand Down Expand Up @@ -213,12 +214,22 @@ def change_maintenance_comment(self, worker_name: str):
except AirflowConfigException:
EDGE_EXECUTOR_ACTIVE = False

# Load the API endpoint only on api-server (Airflow 3.x) or webserver (Airflow 2.x)
# todo(jscheffl): Remove this check when the discussion in
# https://lists.apache.org/thread/w170czq6r7bslkqp1tk6bjjjo0789wgl
# resulted in a proper API to selective initialize. Maybe backcompat-shim
# is also needed to support Airflow-versions prior the rework.
if AIRFLOW_V_3_0_PLUS:
RUNNING_ON_APISERVER = sys.argv[1] in ["api-server"] if len(sys.argv) > 1 else False
else:
RUNNING_ON_APISERVER = "gunicorn" in sys.argv[0] and "airflow-webserver" in sys.argv


class EdgeExecutorPlugin(AirflowPlugin):
"""EdgeExecutor Plugin - provides API endpoints for Edge Workers in Webserver."""

name = "edge_executor"
if EDGE_EXECUTOR_ACTIVE:
if EDGE_EXECUTOR_ACTIVE and RUNNING_ON_APISERVER:
if AIRFLOW_V_3_0_PLUS:
fastapi_apps = [_get_api_endpoint()]
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import importlib
from unittest.mock import patch

import pytest
import time_machine
Expand Down Expand Up @@ -44,17 +45,20 @@ def test_plugin_inactive():


@pytest.mark.db_test
def test_plugin_active():
with conf_vars({("edge", "api_enabled"): "true"}):
def test_plugin_active_apiserver():
mock_cli = ["airflow", "api-server"] if AIRFLOW_V_3_0_PLUS else ["gunicorn", "airflow-webserver"]
with conf_vars({("edge", "api_enabled"): "true"}), patch("sys.argv", mock_cli):
importlib.reload(edge_executor_plugin)

from airflow.providers.edge3.plugins.edge_executor_plugin import (
EDGE_EXECUTOR_ACTIVE,
RUNNING_ON_APISERVER,
EdgeExecutorPlugin,
)

rep = EdgeExecutorPlugin()
assert EDGE_EXECUTOR_ACTIVE
assert RUNNING_ON_APISERVER
if AIRFLOW_V_3_0_PLUS:
assert len(rep.appbuilder_views) == 0
assert len(rep.flask_blueprints) == 0
Expand All @@ -64,6 +68,27 @@ def test_plugin_active():
assert len(rep.flask_blueprints) == 2


@patch("sys.argv", ["airflow", "some-other-command"])
def test_plugin_active_non_apiserver():
with conf_vars({("edge", "api_enabled"): "true"}):
importlib.reload(edge_executor_plugin)

from airflow.providers.edge3.plugins.edge_executor_plugin import (
EDGE_EXECUTOR_ACTIVE,
RUNNING_ON_APISERVER,
EdgeExecutorPlugin,
)

rep = EdgeExecutorPlugin()
assert EDGE_EXECUTOR_ACTIVE
assert not RUNNING_ON_APISERVER
assert len(rep.appbuilder_views) == 0
assert len(rep.flask_blueprints) == 0
assert len(rep.appbuilder_views) == 0
if AIRFLOW_V_3_0_PLUS:
assert len(rep.fastapi_apps) == 0


@pytest.fixture
def plugin():
from airflow.providers.edge3.plugins.edge_executor_plugin import EdgeExecutorPlugin
Expand Down