Skip to content

Conversation

@dirrao
Copy link
Contributor

@dirrao dirrao commented Jun 29, 2023

Schedulers are racing for pod adoption and leads abrupt pod deletes when there is delay in schedulers heartbeats. However the schedulers are alive but not dead their heartbeat is delayed due to network timeout or heavy processing and etc.
This MR will fix the scheduler abrupt deletes in case of delay in scheduler heartbeat.
Closes: #31198

@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:Scheduler including HA (high availability) scheduler labels Jun 29, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 29, 2023

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@dstandish
Copy link
Contributor

dstandish commented Jun 29, 2023

hi, welcome @dirrao

from looking at the changed lines alone, it's hard to understand what the effect of the change is and why it has the effect. it's also important to be sure that it doesn't break anything e.g. how do we know pods will still be deleted in "legitimate" circumstances. can you add some explanation re how your change works? Additionally I see that no tests are added, though one updated. It seems likely that we are missing coverage for the scenario that you are trying to address. Additionally can you make sure that we have coverage for the scenario we wish not to break, i.e. the scenario in which perhaps it was working properly?

@dirrao
Copy link
Contributor Author

dirrao commented Jun 30, 2023

Hi @dstandis / @jedcunningham

Problem:

When scheduler creates a worker pod for the task, it attaches a label to the pod. This label is airflow-worker=<scheduler_job_id>. This label is a unique identifier that indicates which scheduler is tracking this worker pod.
Each scheduler will keep listening for events for only pods that it started. So for watching the pod events, it uses kubernetes watch api. This watch is done only for the condition: label_selector: airflow-worker=<my_job_id> So because of this each scheduler will listen to their own pod events.

So now the following is happening:

  1. Lets say scheduler1 with id 1 started a task in worker pod with label airflow-worker=1 and its heartbeat got delayed.
  2. Now scheduler2 with id 2 thinks that scheduler1 is dead because its heartbeat was not received on time (even though scheduler 1 is alive).
  3. So scheduler2 will adopt tasks of scheduler1. This means scheduler2 will update the label for the pod with scheduler2's job id i.e label is now updated from airflow-worker=1 to airflow-worker=2.
  4. In the case of label updation, Kubernetes watch API is sending a DELETE event on airflow-worker=1 and ADDED event on airflow-worker=2 .
  5. As scheduler1 is still alive, it will keep listening to all events with label airflow-worker=1. It gets the event as DELETED as explained in point 4. So scheduler1 thinks that POD is deleted while it was in running phase and goes ahead and runs the task failure scenario. One of the step in this is to do pod cleanup which is to delete the pod to avoid dangling pods. So it sends a delete request to kubernetes API.

Solution: Change the airflow Kubernetes watch label selector filter from kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"} to kwargs = {"label_selector": "airflow-worker"} and then filter the events in scheduler by airflow-worker=<my_job_id>.

QA: I have updated the test case to ensure the existing functionality. However, I am not sure how to write the test cases in case multi scheduler. Can you share references to it?

@eladkal eladkal requested a review from hussein-awala July 14, 2023 20:18
@hussein-awala
Copy link
Member

Good investigation, I just created a simple pod for testing:

apiVersion: v1
kind: Pod
metadata:
  name: test-pod
  labels:
    airflow-worker: scheduler1
spec:
  restartPolicy: Always
  containers:
  - name: base
    image: ubuntu
    command: ["tail"]
    args: ["-f", "/dev/null"]

and a watcher:

from kubernetes import client, config, watch

if __name__ == '__main__':
    config.load_kube_config(context="<context>")
    v1 = client.CoreV1Api()

    kwargs = {"label_selector": "airflow-worker=scheduler1"}

    w = watch.Watch()
    for event in w.stream(v1.list_namespaced_pod, "<namespace>", **kwargs):
        print("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))

and I got:

Event: DELETED Pod test-pod

when I patched the label with:

kubectl --context <my context> --namespace <my namespace> pod/test-pod airflow-worker=scheduler2 --overwrite

However, I don't think that your change could fix this issue. When we call:

for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs)

The scheduler 1 will fetch only the events for the pods with airflow-worker=scheduler1 at the moment of the call, so this issue happens after fetching the pod list, in this case, labels.get("airflow-worker", None) != scheduler_job_id will be false. Instead, you can add a check in the method process_status before adding the pod to the watcher_queue with a failed state, in this case, when we receive a DELETED event, we could fetch the last state of the pod (with the new labels), and we compare the airflow-worker label with the scheduler id, if it's the same (normal case), we fail the TI, if not (the event was created by an adoption operation), we skip failing the TI. WDYT?

(You need to merge/rebase master because #30727 has moved these methods to a new module)

@dirrao
Copy link
Contributor Author

dirrao commented Jul 15, 2023

@hussein-awala
Thanks for sharing the example. The reason for this issue is schedulers are subscribed to their worker pod events based on the scheduler id label filter i.e. ( kwargs = {"label_selector": "airflow-worker=scheduler1"}). Whenever there is a change in the label of a worker pod from airflow-worker=scheduler1 to airflow-worker=scheduler2, this worker pod events goes out of scope for scheduler1. So, that's why we are seeing the DELETE event though the worker pod is still running. I have proposed to subscribe to all the worker pod events i.e. (kwargs = {"label_selector": "airflow-worker"}). So, each scheduler receives all the worker pod events i.e. both scheduler1 and scheduler2. In this specific issue, now the scheduler1 receives a MODIFIED instead of a DELETE event during the label change. To avoid processing worker pod events of other schedulers, drop the events based on the scheduler id label i.e. (labels.get("airflow-worker", None) != scheduler_job_id). I will rebase my PR and update it soon.

Note: This issue is happening for worker pods in all phases (PENDING, RUNNING, etc.)

existing code

kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}

proposed code

kwargs = {"label_selector": "airflow-worker"}

if labels.get("airflow-worker", None) != scheduler_job_id:
     last_resource_version = task.metadata.resource_version
     continue

Example

from kubernetes import client, config, watch

if __name__ == '__main__':
    config.load_kube_config(context="<context>")
    v1 = client.CoreV1Api()

    kwargs = {"label_selector": "airflow-worker"}

    w = watch.Watch()
    for event in w.stream(v1.list_namespaced_pod, "<namespace>", **kwargs):
        print("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))
        if labels.get("airflow-worker", None) != "scheduler1":
             print("Skipping the event of other scheduler i.e scheduler2")
             continue
        else
             print("Processing the event of current scheduler i.e scheduler1")

and I got:

Event: MODIFIED Pod test-pod
Skipping the event of other scheduler i.e scheduler2

when I patched the label with:

kubectl --context <my context> --namespace <my namespace> pod/test-pod airflow-worker=scheduler2 --overwrite

@hussein-awala
Copy link
Member

Whenever there is a change in the label of a worker pod from airflow-worker=scheduler1 to airflow-worker=scheduler2, this worker pod events goes out of scope for scheduler1.

Ok it's more clear now, the produced event tell us that the watcher doesn't find the watched pod anymore using the current selectors.

I will run more tests to make sure that it doesn't break anything. Could you add a unit test? (don't forget to merge master before)

@dirrao dirrao force-pushed the k8s-executor-avoid-abrupt-pod-deletes branch from f5b819d to fc08dfa Compare July 17, 2023 16:02
@dirrao
Copy link
Contributor Author

dirrao commented Jul 17, 2023

@hussein-awala
I have rebased the code with main branch and added unit test cases. Can you review it?

@dirrao dirrao requested a review from hussein-awala July 18, 2023 08:38
@hussein-awala
Copy link
Member

@dstandish @jedcunningham could you review this PR before merging it?

Comment on lines +149 to +151
# Schedulers are issuing abrupt pod deletes when there is a delay in schedulers' heartbeat
# https://github.com/apache/airflow/issues/31198
# Added below scheduler_job_id condition to skip the events of pods created by other schedulers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move these comments (here and on 132) to (1) the commit message and (2) the test. Don't think they are needed here.

@jedcunningham
Copy link
Member

In theory this was fixed in #31274 which was in 2.6.2. Have you reproduced this in main?

@dstandish
Copy link
Contributor

hey @dirrao, thanks for the very helpful explanation and nice fix for this. i was hoping maybe we could figure out a way to not watch all scheduler's pods... but it seems there may not be a way... is that your assessment as well?

@dstandish
Copy link
Contributor

Ah yes thanks @jedcunningham

yeah tried this out locally (manually watching pods and messing with them). i guess the idea is to use the pod's deletion_timestamp metadata attr to determine whether it was actually deleted.

seems it should work

from kubernetes import client, config, watch

if __name__ == '__main__':
    config.load_kube_config()
    v1 = client.CoreV1Api()

    kwargs = {"label_selector": "airflow-worker=scheduler-1"}

    w = watch.Watch()
    for event in w.stream(v1.list_namespaced_pod, "default", **kwargs):
        print("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))
        if event['type'] == "DELETED":
            if event['object'].metadata.deletion_timestamp:
                print("Pod was deleted")
            else:
                print("Pod was not actually deleted")

@dirrao
Copy link
Contributor Author

dirrao commented Jul 21, 2023

Hi @dstandish, @jedcunningham, @hussein-awala
Good to know this is already fixed. deletion_timestamp made things much easy. I have verified a few scenarios, the existing fix is working. Thanks every one for helping me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Schedulers are issuing abrupt pod deletes when there is a delay in schedulers' heartbeat

4 participants