Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,14 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
"""
Clear tasks that were not yet launched, but were previously queued.

Tasks can end up in a "Queued" state through either the executor being
abruptly shut down (leaving a non-empty task_queue on this executor)
or when a rescheduled/deferred operator comes back up for execution
(with the same try_number) before the pod of its previous incarnation
has been fully removed (we think).
Tasks can end up in a "Queued" state through when a rescheduled/deferred
operator comes back up for execution (with the same try_number) before the
pod of its previous incarnation has been fully removed (we think).

This method checks each of those tasks to see if the corresponding pod
It's also possible when an executor abruptly shuts down (leaving a non-empty
task_queue on that executor), but that scenario is handled via normal adoption.

This method checks each of our queued tasks to see if the corresponding pod
is around, and if not, and there's no matching entry in our own
task_queue, marks it for re-execution.
"""
Expand Down Expand Up @@ -779,7 +780,18 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
scheduler_job_id = pod_generator.make_safe_label_value(str(scheduler_job_id))
query_kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
# We will look for any pods owned by the no-longer-running scheduler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know that scheduler_job_ids are all not running?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if you look up a few lines, we build that from the TI's SchedulerJob asks us to try and adopt. And those are TIs are tied to non-running SchedulerJobs.

# but will exclude only successful pods, as those TIs will have a terminal state
# and not be up for adoption!
# Those workers that failed, however, are okay to adopt here as their TI will
# still be in queued.
query_kwargs = {
"field_selector": "status.phase!=Succeeded",
"label_selector": (
"kubernetes_executor=True,"
f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"
),
}
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
self.adopt_launched_task(kube_client, pod, pod_ids)
Expand Down
20 changes: 16 additions & 4 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,9 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la
# First adoption
reset_tis = executor.try_adopt_task_instances([mock_ti])
mock_kube_client.list_namespaced_pod.assert_called_once_with(
namespace="default", label_selector="airflow-worker=1"
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True",
)
mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {ti_key: mock_ti})
mock_adopt_completed_pods.assert_called_once()
Expand All @@ -672,7 +674,9 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la

reset_tis = executor.try_adopt_task_instances([mock_ti])
mock_kube_client.list_namespaced_pod.assert_called_once_with(
namespace="default", label_selector="airflow-worker=10"
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
)
mock_adopt_launched_task.assert_called_once() # Won't check args this time around as they get mutated
mock_adopt_completed_pods.assert_called_once()
Expand All @@ -695,8 +699,16 @@ def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_comple
assert mock_kube_client.list_namespaced_pod.call_count == 2
mock_kube_client.list_namespaced_pod.assert_has_calls(
[
mock.call(namespace="default", label_selector="airflow-worker=10"),
mock.call(namespace="default", label_selector="airflow-worker=40"),
mock.call(
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
),
mock.call(
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True",
),
],
any_order=True,
)
Expand Down