Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,16 @@ class BaseExecutor(LoggingMixin):
supports_pickling: bool = True
supports_sentry: bool = False

job_id: None | int | str = None
callback_sink: BaseCallbackSink | None = None

is_local: bool = False
is_single_threaded: bool = False
change_sensor_mode_to_reschedule: bool = False
is_production: bool = True

change_sensor_mode_to_reschedule: bool = False
serve_logs: bool = False

job_id: None | int | str = None
callback_sink: BaseCallbackSink | None = None

def __init__(self, parallelism: int = PARALLELISM):
super().__init__()
self.parallelism: int = parallelism
Expand Down
7 changes: 5 additions & 2 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ class CeleryKubernetesExecutor(LoggingMixin):
supports_ad_hoc_ti_run: bool = True
supports_pickling: bool = True
supports_sentry: bool = False
change_sensor_mode_to_reschedule: bool = False
is_single_threaded: bool = False

is_local: bool = False
is_single_threaded: bool = False
is_production: bool = True

serve_logs: bool = False
change_sensor_mode_to_reschedule: bool = False

callback_sink: BaseCallbackSink | None = None

Expand Down
5 changes: 4 additions & 1 deletion airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ class DebugExecutor(BaseExecutor):
"""

_terminated = threading.Event()
change_sensor_mode_to_reschedule: bool = True

is_single_threaded: bool = True
is_production: bool = False

change_sensor_mode_to_reschedule: bool = True

def __init__(self):
super().__init__()
Expand Down
7 changes: 5 additions & 2 deletions airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ class LocalKubernetesExecutor(LoggingMixin):
supports_ad_hoc_ti_run: bool = True
supports_pickling: bool = False
supports_sentry: bool = False
change_sensor_mode_to_reschedule: bool = False
is_single_threaded: bool = False

is_local: bool = False
is_single_threaded: bool = False
is_production: bool = True

serve_logs: bool = True
change_sensor_mode_to_reschedule: bool = False

callback_sink: BaseCallbackSink | None = None

Expand Down
3 changes: 3 additions & 0 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ class SequentialExecutor(BaseExecutor):
"""

supports_pickling: bool = False

is_local: bool = True
is_single_threaded: bool = True
is_production: bool = False

serve_logs: bool = True

def __init__(self):
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@
<a href={{ get_docs_url("howto/set-up-database.html") }}><b>Click here</b></a> for more information.
{% endcall %}
{% endif %}
{% if sequential_executor_warning | default(false) %}
{% if production_executor_warning | default(false) %}
{% call show_message(category='warning', dismissible=false) %}
Do not use <b>SequentialExecutor</b> in production.
Do not use the <b>{{ production_executor_warning }}</b> in production.
<a href={{ get_docs_url("executor/index.html") }}><b>Click here</b></a> for more information.
{% endcall %}
{% endif %}
Expand Down
5 changes: 4 additions & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
from airflow.configuration import AIRFLOW_CONFIG, conf
from airflow.datasets import Dataset
from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job import BaseJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.jobs.triggerer_job import TriggererJob
Expand Down Expand Up @@ -623,8 +624,10 @@ class AirflowBaseView(BaseView):
}

if not conf.getboolean("core", "unit_test_mode"):
executor, _ = ExecutorLoader.import_default_executor_cls()
extra_args["sqlite_warning"] = settings.engine.dialect.name == "sqlite"
extra_args["sequential_executor_warning"] = conf.get("core", "executor") == "SequentialExecutor"
if not executor.is_production:
extra_args["production_executor_warning"] = executor.__name__

line_chart_attr = {
"legend.maxKeyLength": 200,
Expand Down
4 changes: 4 additions & 0 deletions tests/executors/test_base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ def test_is_single_threaded_default_value():
assert not BaseExecutor.is_single_threaded


def test_is_production_default_value():
assert BaseExecutor.is_production


def test_get_task_log():
executor = BaseExecutor()
ti = TaskInstance(task=BaseOperator(task_id="dummy"))
Expand Down
3 changes: 3 additions & 0 deletions tests/executors/test_celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def test_supports_sentry(self):
def test_is_local_default_value(self):
assert not CeleryKubernetesExecutor.is_local

def test_is_production_default_value(self):
assert CeleryKubernetesExecutor.is_production

def test_serve_logs_default_value(self):
assert not CeleryKubernetesExecutor.serve_logs

Expand Down
3 changes: 3 additions & 0 deletions tests/executors/test_debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,6 @@ def test_reschedule_mode(self):

def test_is_single_threaded(self):
assert DebugExecutor.is_single_threaded

def test_is_production_default_value(self):
assert not DebugExecutor.is_production
3 changes: 3 additions & 0 deletions tests/executors/test_local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def test_supports_sentry(self):
def test_is_local_default_value(self):
assert not LocalKubernetesExecutor.is_local

def test_is_production_default_value(self):
assert LocalKubernetesExecutor.is_production

def test_serve_logs_default_value(self):
assert LocalKubernetesExecutor.serve_logs

Expand Down
3 changes: 3 additions & 0 deletions tests/executors/test_sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def test_supports_sentry(self):
def test_is_local_default_value(self):
assert SequentialExecutor.is_local

def test_is_production_default_value(self):
assert not SequentialExecutor.is_production

def test_serve_logs_default_value(self):
assert SequentialExecutor.serve_logs

Expand Down