Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non
from airflow.models.taskinstance import TaskInstance

hybrid_executor_enabled = hasattr(TaskInstance, "executor")
default_executor = None
default_executor_alias = None
if hybrid_executor_enabled:
from airflow.executors.executor_loader import ExecutorLoader

default_executor = str(ExecutorLoader.get_default_executor_name())
default_executor = default_executor.strip(":")
default_executor_name = ExecutorLoader.get_default_executor_name()
default_executor_alias = default_executor_name.alias

with Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"):
self.log.debug("Clearing tasks that have not been launched")
Expand All @@ -254,7 +254,10 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non
)
if self.kubernetes_queue:
query = query.where(TaskInstance.queue == self.kubernetes_queue)
elif hybrid_executor_enabled and default_executor == KUBERNETES_EXECUTOR:
# KUBERNETES_EXECUTOR is the string name/alias of the "core" executor represented by this
# module. The ExecutorName for "core" executors always contains an alias and cannot be modified
# to be different from the constant (in this case KUBERNETES_EXECUTOR).
elif hybrid_executor_enabled and default_executor_alias == KUBERNETES_EXECUTOR:
query = query.where(
or_(
TaskInstance.executor == KUBERNETES_EXECUTOR,
Expand Down