diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index fbb64d078e6d2..2b6f6e2fe4cc3 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -109,15 +109,16 @@ class BaseExecutor(LoggingMixin): supports_pickling: bool = True supports_sentry: bool = False - job_id: None | int | str = None - callback_sink: BaseCallbackSink | None = None - is_local: bool = False is_single_threaded: bool = False - change_sensor_mode_to_reschedule: bool = False + is_production: bool = True + change_sensor_mode_to_reschedule: bool = False serve_logs: bool = False + job_id: None | int | str = None + callback_sink: BaseCallbackSink | None = None + def __init__(self, parallelism: int = PARALLELISM): super().__init__() self.parallelism: int = parallelism diff --git a/airflow/executors/celery_kubernetes_executor.py b/airflow/executors/celery_kubernetes_executor.py index 667810acd4fa6..ea04b8a42b49d 100644 --- a/airflow/executors/celery_kubernetes_executor.py +++ b/airflow/executors/celery_kubernetes_executor.py @@ -41,10 +41,13 @@ class CeleryKubernetesExecutor(LoggingMixin): supports_ad_hoc_ti_run: bool = True supports_pickling: bool = True supports_sentry: bool = False - change_sensor_mode_to_reschedule: bool = False - is_single_threaded: bool = False + is_local: bool = False + is_single_threaded: bool = False + is_production: bool = True + serve_logs: bool = False + change_sensor_mode_to_reschedule: bool = False callback_sink: BaseCallbackSink | None = None diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index df6746e324189..c506453db984b 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -43,8 +43,11 @@ class DebugExecutor(BaseExecutor): """ _terminated = threading.Event() - change_sensor_mode_to_reschedule: bool = True + is_single_threaded: bool = True + is_production: bool = False + + change_sensor_mode_to_reschedule: bool = True def __init__(self): super().__init__() diff --git a/airflow/executors/local_kubernetes_executor.py b/airflow/executors/local_kubernetes_executor.py index 2b19b40a87a2a..059aa11dcb35e 100644 --- a/airflow/executors/local_kubernetes_executor.py +++ b/airflow/executors/local_kubernetes_executor.py @@ -41,10 +41,13 @@ class LocalKubernetesExecutor(LoggingMixin): supports_ad_hoc_ti_run: bool = True supports_pickling: bool = False supports_sentry: bool = False - change_sensor_mode_to_reschedule: bool = False - is_single_threaded: bool = False + is_local: bool = False + is_single_threaded: bool = False + is_production: bool = True + serve_logs: bool = True + change_sensor_mode_to_reschedule: bool = False callback_sink: BaseCallbackSink | None = None diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index d409665f72abe..0f75c3d9304ca 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -45,8 +45,11 @@ class SequentialExecutor(BaseExecutor): """ supports_pickling: bool = False + is_local: bool = True is_single_threaded: bool = True + is_production: bool = False + serve_logs: bool = True def __init__(self): diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index f96c1244b2700..18583a2d95d8d 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -113,9 +113,9 @@ Click here for more information. {% endcall %} {% endif %} - {% if sequential_executor_warning | default(false) %} + {% if production_executor_warning | default(false) %} {% call show_message(category='warning', dismissible=false) %} - Do not use SequentialExecutor in production. + Do not use the {{ production_executor_warning }} in production. Click here for more information. {% endcall %} {% endif %} diff --git a/airflow/www/views.py b/airflow/www/views.py index c39d4013121ad..078d90700c12f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -89,6 +89,7 @@ from airflow.configuration import AIRFLOW_CONFIG, conf from airflow.datasets import Dataset from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning +from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.base_job import BaseJob from airflow.jobs.scheduler_job import SchedulerJob from airflow.jobs.triggerer_job import TriggererJob @@ -623,8 +624,10 @@ class AirflowBaseView(BaseView): } if not conf.getboolean("core", "unit_test_mode"): + executor, _ = ExecutorLoader.import_default_executor_cls() extra_args["sqlite_warning"] = settings.engine.dialect.name == "sqlite" - extra_args["sequential_executor_warning"] = conf.get("core", "executor") == "SequentialExecutor" + if not executor.is_production: + extra_args["production_executor_warning"] = executor.__name__ line_chart_attr = { "legend.maxKeyLength": 200, diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index a93dbe9caf4e1..b7d14078da140 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -48,6 +48,10 @@ def test_is_single_threaded_default_value(): assert not BaseExecutor.is_single_threaded +def test_is_production_default_value(): + assert BaseExecutor.is_production + + def test_get_task_log(): executor = BaseExecutor() ti = TaskInstance(task=BaseOperator(task_id="dummy")) diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py index ebf4e33c5feeb..8da481800313c 100644 --- a/tests/executors/test_celery_kubernetes_executor.py +++ b/tests/executors/test_celery_kubernetes_executor.py @@ -40,6 +40,9 @@ def test_supports_sentry(self): def test_is_local_default_value(self): assert not CeleryKubernetesExecutor.is_local + def test_is_production_default_value(self): + assert CeleryKubernetesExecutor.is_production + def test_serve_logs_default_value(self): assert not CeleryKubernetesExecutor.serve_logs diff --git a/tests/executors/test_debug_executor.py b/tests/executors/test_debug_executor.py index 6d8f6529829c7..cb939a1e6a761 100644 --- a/tests/executors/test_debug_executor.py +++ b/tests/executors/test_debug_executor.py @@ -121,3 +121,6 @@ def test_reschedule_mode(self): def test_is_single_threaded(self): assert DebugExecutor.is_single_threaded + + def test_is_production_default_value(self): + assert not DebugExecutor.is_production diff --git a/tests/executors/test_local_kubernetes_executor.py b/tests/executors/test_local_kubernetes_executor.py index 09fb55ca8a887..f92279cfea7f1 100644 --- a/tests/executors/test_local_kubernetes_executor.py +++ b/tests/executors/test_local_kubernetes_executor.py @@ -35,6 +35,9 @@ def test_supports_sentry(self): def test_is_local_default_value(self): assert not LocalKubernetesExecutor.is_local + def test_is_production_default_value(self): + assert LocalKubernetesExecutor.is_production + def test_serve_logs_default_value(self): assert LocalKubernetesExecutor.serve_logs diff --git a/tests/executors/test_sequential_executor.py b/tests/executors/test_sequential_executor.py index 400574e5bf233..94bea518571ad 100644 --- a/tests/executors/test_sequential_executor.py +++ b/tests/executors/test_sequential_executor.py @@ -32,6 +32,9 @@ def test_supports_sentry(self): def test_is_local_default_value(self): assert SequentialExecutor.is_local + def test_is_production_default_value(self): + assert not SequentialExecutor.is_production + def test_serve_logs_default_value(self): assert SequentialExecutor.serve_logs