From 79d36d06a55614f201fe1d94d45b1dfb35427e35 Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Wed, 15 Feb 2023 09:55:56 -0800 Subject: [PATCH] Decouple production executor warning in dags UI Previously the views code was hardcoded to look out for the SequentialExecutor and warn to not use it in production. This change makes that generalized to any non-production executor --- airflow/executors/base_executor.py | 9 +++++---- airflow/executors/celery_kubernetes_executor.py | 7 +++++-- airflow/executors/debug_executor.py | 5 ++++- airflow/executors/local_kubernetes_executor.py | 7 +++++-- airflow/executors/sequential_executor.py | 3 +++ airflow/www/templates/airflow/dags.html | 4 ++-- airflow/www/views.py | 5 ++++- tests/executors/test_base_executor.py | 4 ++++ tests/executors/test_celery_kubernetes_executor.py | 3 +++ tests/executors/test_debug_executor.py | 3 +++ tests/executors/test_local_kubernetes_executor.py | 3 +++ tests/executors/test_sequential_executor.py | 3 +++ 12 files changed, 44 insertions(+), 12 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index fbb64d078e6d2..2b6f6e2fe4cc3 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -109,15 +109,16 @@ class BaseExecutor(LoggingMixin): supports_pickling: bool = True supports_sentry: bool = False - job_id: None | int | str = None - callback_sink: BaseCallbackSink | None = None - is_local: bool = False is_single_threaded: bool = False - change_sensor_mode_to_reschedule: bool = False + is_production: bool = True + change_sensor_mode_to_reschedule: bool = False serve_logs: bool = False + job_id: None | int | str = None + callback_sink: BaseCallbackSink | None = None + def __init__(self, parallelism: int = PARALLELISM): super().__init__() self.parallelism: int = parallelism diff --git a/airflow/executors/celery_kubernetes_executor.py b/airflow/executors/celery_kubernetes_executor.py index 667810acd4fa6..ea04b8a42b49d 100644 --- a/airflow/executors/celery_kubernetes_executor.py +++ b/airflow/executors/celery_kubernetes_executor.py @@ -41,10 +41,13 @@ class CeleryKubernetesExecutor(LoggingMixin): supports_ad_hoc_ti_run: bool = True supports_pickling: bool = True supports_sentry: bool = False - change_sensor_mode_to_reschedule: bool = False - is_single_threaded: bool = False + is_local: bool = False + is_single_threaded: bool = False + is_production: bool = True + serve_logs: bool = False + change_sensor_mode_to_reschedule: bool = False callback_sink: BaseCallbackSink | None = None diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index df6746e324189..c506453db984b 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -43,8 +43,11 @@ class DebugExecutor(BaseExecutor): """ _terminated = threading.Event() - change_sensor_mode_to_reschedule: bool = True + is_single_threaded: bool = True + is_production: bool = False + + change_sensor_mode_to_reschedule: bool = True def __init__(self): super().__init__() diff --git a/airflow/executors/local_kubernetes_executor.py b/airflow/executors/local_kubernetes_executor.py index 2b19b40a87a2a..059aa11dcb35e 100644 --- a/airflow/executors/local_kubernetes_executor.py +++ b/airflow/executors/local_kubernetes_executor.py @@ -41,10 +41,13 @@ class LocalKubernetesExecutor(LoggingMixin): supports_ad_hoc_ti_run: bool = True supports_pickling: bool = False supports_sentry: bool = False - change_sensor_mode_to_reschedule: bool = False - is_single_threaded: bool = False + is_local: bool = False + is_single_threaded: bool = False + is_production: bool = True + serve_logs: bool = True + change_sensor_mode_to_reschedule: bool = False callback_sink: BaseCallbackSink | None = None diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index d409665f72abe..0f75c3d9304ca 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -45,8 +45,11 @@ class SequentialExecutor(BaseExecutor): """ supports_pickling: bool = False + is_local: bool = True is_single_threaded: bool = True + is_production: bool = False + serve_logs: bool = True def __init__(self): diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index f96c1244b2700..18583a2d95d8d 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -113,9 +113,9 @@ Click here for more information. {% endcall %} {% endif %} - {% if sequential_executor_warning | default(false) %} + {% if production_executor_warning | default(false) %} {% call show_message(category='warning', dismissible=false) %} - Do not use SequentialExecutor in production. + Do not use the {{ production_executor_warning }} in production. Click here for more information. {% endcall %} {% endif %} diff --git a/airflow/www/views.py b/airflow/www/views.py index c39d4013121ad..078d90700c12f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -89,6 +89,7 @@ from airflow.configuration import AIRFLOW_CONFIG, conf from airflow.datasets import Dataset from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning +from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.base_job import BaseJob from airflow.jobs.scheduler_job import SchedulerJob from airflow.jobs.triggerer_job import TriggererJob @@ -623,8 +624,10 @@ class AirflowBaseView(BaseView): } if not conf.getboolean("core", "unit_test_mode"): + executor, _ = ExecutorLoader.import_default_executor_cls() extra_args["sqlite_warning"] = settings.engine.dialect.name == "sqlite" - extra_args["sequential_executor_warning"] = conf.get("core", "executor") == "SequentialExecutor" + if not executor.is_production: + extra_args["production_executor_warning"] = executor.__name__ line_chart_attr = { "legend.maxKeyLength": 200, diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index a93dbe9caf4e1..b7d14078da140 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -48,6 +48,10 @@ def test_is_single_threaded_default_value(): assert not BaseExecutor.is_single_threaded +def test_is_production_default_value(): + assert BaseExecutor.is_production + + def test_get_task_log(): executor = BaseExecutor() ti = TaskInstance(task=BaseOperator(task_id="dummy")) diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py index ebf4e33c5feeb..8da481800313c 100644 --- a/tests/executors/test_celery_kubernetes_executor.py +++ b/tests/executors/test_celery_kubernetes_executor.py @@ -40,6 +40,9 @@ def test_supports_sentry(self): def test_is_local_default_value(self): assert not CeleryKubernetesExecutor.is_local + def test_is_production_default_value(self): + assert CeleryKubernetesExecutor.is_production + def test_serve_logs_default_value(self): assert not CeleryKubernetesExecutor.serve_logs diff --git a/tests/executors/test_debug_executor.py b/tests/executors/test_debug_executor.py index 6d8f6529829c7..cb939a1e6a761 100644 --- a/tests/executors/test_debug_executor.py +++ b/tests/executors/test_debug_executor.py @@ -121,3 +121,6 @@ def test_reschedule_mode(self): def test_is_single_threaded(self): assert DebugExecutor.is_single_threaded + + def test_is_production_default_value(self): + assert not DebugExecutor.is_production diff --git a/tests/executors/test_local_kubernetes_executor.py b/tests/executors/test_local_kubernetes_executor.py index 09fb55ca8a887..f92279cfea7f1 100644 --- a/tests/executors/test_local_kubernetes_executor.py +++ b/tests/executors/test_local_kubernetes_executor.py @@ -35,6 +35,9 @@ def test_supports_sentry(self): def test_is_local_default_value(self): assert not LocalKubernetesExecutor.is_local + def test_is_production_default_value(self): + assert LocalKubernetesExecutor.is_production + def test_serve_logs_default_value(self): assert LocalKubernetesExecutor.serve_logs diff --git a/tests/executors/test_sequential_executor.py b/tests/executors/test_sequential_executor.py index 400574e5bf233..94bea518571ad 100644 --- a/tests/executors/test_sequential_executor.py +++ b/tests/executors/test_sequential_executor.py @@ -32,6 +32,9 @@ def test_supports_sentry(self): def test_is_local_default_value(self): assert SequentialExecutor.is_local + def test_is_production_default_value(self): + assert not SequentialExecutor.is_production + def test_serve_logs_default_value(self): assert SequentialExecutor.serve_logs