From 0b86c2eab10b29ca75de0e1d9622d5a850420478 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Thu, 12 Jan 2023 12:58:41 -0700 Subject: [PATCH 1/2] Be more selective when adopting pods with KubernetesExecutor When trying to adopt "resettable" TIs from SchedulerJob, we should not list out all the pods to compare against, only those that didn't succeed. This means we will get any pods that are still starting, running, or failed (meaning the TI wasn't moved to a terminal state there, and will be in out "adoptable" list). This avoids the scenario where a dead scheduler has both a completed, successful worker, and a still running worker, causing log lines like these about the successful one: ERROR - attempting to adopt taskinstance which was not specified by database: TaskInstanceKey(...) This also makes sure we only find pods with the `kubernetes_executor=True` label for extra safety. Closes #28071 --- airflow/executors/kubernetes_executor.py | 23 ++++++++++++++------- tests/executors/test_kubernetes_executor.py | 20 ++++++++++++++---- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 8d3d905be9552..14968ab534e1a 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -496,13 +496,14 @@ def clear_not_launched_queued_tasks(self, session=None) -> None: """ Clear tasks that were not yet launched, but were previously queued. - Tasks can end up in a "Queued" state through either the executor being - abruptly shut down (leaving a non-empty task_queue on this executor) - or when a rescheduled/deferred operator comes back up for execution - (with the same try_number) before the pod of its previous incarnation - has been fully removed (we think). + Tasks can end up in a "Queued" state through when a rescheduled/deferred + operator comes back up for execution (with the same try_number) before the + pod of its previous incarnation has been fully removed (we think). - This method checks each of those tasks to see if the corresponding pod + It's also possible when an executor abruptly shuts down (leaving a non-empty + task_queue on that executor), but that scenario is handled via normal adoption. + + This method checks each of our queued tasks to see if the corresponding pod is around, and if not, and there's no matching entry in our own task_queue, marks it for re-execution. """ @@ -772,7 +773,15 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task kube_client: client.CoreV1Api = self.kube_client for scheduler_job_id in scheduler_job_ids: scheduler_job_id = pod_generator.make_safe_label_value(str(scheduler_job_id)) - query_kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"} + # We will look for any pods owned by the no-longer-running scheduler, + # but will exclude only successful pods, as those TIs will have a terminal state + # and not be up for adoption! + # Those workers that failed, however, are okay to adopt here as their TI will + # still be in queued. + query_kwargs = { + "field_selector": "status.phase!=Succeeded", + "label_selector": f"kubernetes_executor=True,airflow-worker={scheduler_job_id}", + } pod_list = self._list_pods(query_kwargs) for pod in pod_list: self.adopt_launched_task(kube_client, pod, pod_ids) diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 8e30835aa3e73..ae9c8d174b0e8 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -644,7 +644,9 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la # First adoption reset_tis = executor.try_adopt_task_instances([mock_ti]) mock_kube_client.list_namespaced_pod.assert_called_once_with( - namespace="default", label_selector="airflow-worker=1" + namespace="default", + field_selector="status.phase!=Succeeded", + label_selector="kubernetes_executor=True,airflow-worker=1", ) mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {ti_key: mock_ti}) mock_adopt_completed_pods.assert_called_once() @@ -662,7 +664,9 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la reset_tis = executor.try_adopt_task_instances([mock_ti]) mock_kube_client.list_namespaced_pod.assert_called_once_with( - namespace="default", label_selector="airflow-worker=10" + namespace="default", + field_selector="status.phase!=Succeeded", + label_selector="kubernetes_executor=True,airflow-worker=10", ) mock_adopt_launched_task.assert_called_once() # Won't check args this time around as they get mutated mock_adopt_completed_pods.assert_called_once() @@ -685,8 +689,16 @@ def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_comple assert mock_kube_client.list_namespaced_pod.call_count == 2 mock_kube_client.list_namespaced_pod.assert_has_calls( [ - mock.call(namespace="default", label_selector="airflow-worker=10"), - mock.call(namespace="default", label_selector="airflow-worker=40"), + mock.call( + namespace="default", + field_selector="status.phase!=Succeeded", + label_selector="kubernetes_executor=True,airflow-worker=10", + ), + mock.call( + namespace="default", + field_selector="status.phase!=Succeeded", + label_selector="kubernetes_executor=True,airflow-worker=40", + ), ], any_order=True, ) From c6c164de144689eda76a640b7dcc3bb59c93c289 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Thu, 12 Jan 2023 14:32:43 -0700 Subject: [PATCH 2/2] Also ignore done pods --- airflow/executors/kubernetes_executor.py | 5 ++++- tests/executors/test_kubernetes_executor.py | 8 ++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 14968ab534e1a..9c9dbc9ffdfb5 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -780,7 +780,10 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task # still be in queued. query_kwargs = { "field_selector": "status.phase!=Succeeded", - "label_selector": f"kubernetes_executor=True,airflow-worker={scheduler_job_id}", + "label_selector": ( + "kubernetes_executor=True," + f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True" + ), } pod_list = self._list_pods(query_kwargs) for pod in pod_list: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index ae9c8d174b0e8..5a062be0ab7b0 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -646,7 +646,7 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la mock_kube_client.list_namespaced_pod.assert_called_once_with( namespace="default", field_selector="status.phase!=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker=1", + label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True", ) mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {ti_key: mock_ti}) mock_adopt_completed_pods.assert_called_once() @@ -666,7 +666,7 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la mock_kube_client.list_namespaced_pod.assert_called_once_with( namespace="default", field_selector="status.phase!=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker=10", + label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True", ) mock_adopt_launched_task.assert_called_once() # Won't check args this time around as they get mutated mock_adopt_completed_pods.assert_called_once() @@ -692,12 +692,12 @@ def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_comple mock.call( namespace="default", field_selector="status.phase!=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker=10", + label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True", ), mock.call( namespace="default", field_selector="status.phase!=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker=40", + label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True", ), ], any_order=True,