-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Allow for retry when tasks are stuck in queued #43520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
+507
−94
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dstandish
reviewed
Oct 30, 2024
o-nikolas
reviewed
Oct 30, 2024
dstandish
reviewed
Oct 30, 2024
dstandish
reviewed
Oct 30, 2024
…ed_timeout`. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses track of a task, a cluster can't further scale up its workers, etc.), but tasks should not be stuck in queued for a long time. Originally, we simply marked a task as failed when it was stuck in queued for too long. We found that this led to suboptimal outcomes as ideally we would like "failed" to mean that a task was unable to run, instead of it meaning that we were unable to run the task. As a compromise between always failing a stuck task and always rescheduling a stuck task (which could lead to tasks being stuck in queued forever without informing the user), we have creating the config `AIRFLOW__CORE__NUM_STUCK_RETRIES`. With this new configuration, an airflow admin can decide how sensitive they would like their airflow to be WRT failing stuck tasks.
1f8b642 to
8eb60b1
Compare
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
…s since they will not have the scheduler change
…flow into handle-stuck-in-queue
…flow into handle-stuck-in-queue
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
…ed_timeout`. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses track of a task, a cluster can't further scale up its workers, etc.), but tasks should not be stuck in queued for a long time. Originally, we simply marked a task as failed when it was stuck in queued for too long. We found that this led to suboptimal outcomes as ideally we would like "failed" to mean that a task was unable to run, instead of it meaning that we were unable to run the task. As a compromise between always failing a stuck task and always rescheduling a stuck task (which could lead to tasks being stuck in queued forever without informing the user), we have creating the config `AIRFLOW__CORE__NUM_STUCK_RETRIES`. With this new configuration, an airflow admin can decide how sensitive they would like their airflow to be WRT failing stuck tasks.
…flow into handle-stuck-in-queue
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
jscheffl
reviewed
Nov 16, 2024
jscheffl
pushed a commit
to jscheffl/airflow
that referenced
this pull request
Nov 18, 2024
The old "stuck in queued" logic just failed the tasks. Now we requeue them. We accomplish this by revoking the task from executor and setting state to scheduled. We'll re-queue it up to 2 times. Number of times is configurable by hidden config. We added a method to base executor revoke_task because, it's a discrete operation that is required for this feature, and it might be useful in other cases e.g. when detecting as zombies etc. We set state to failed or scheduled directly from scheduler (rather than sending through the event buffer) because event buffer makes more sense for handling external events -- why round trip through the executor and back to scheduler when scheduler is initiating the action? Anyway this avoids having to deal with "state mismatch" issues when processing events. --------- (cherry picked from commit a41feeb) Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com> Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
kandharvishnu
pushed a commit
to kandharvishnu/airflow
that referenced
this pull request
Nov 19, 2024
The old "stuck in queued" logic just failed the tasks. Now we requeue them. We accomplish this by revoking the task from executor and setting state to scheduled. We'll re-queue it up to 2 times. Number of times is configurable by hidden config. We added a method to base executor revoke_task because, it's a discrete operation that is required for this feature, and it might be useful in other cases e.g. when detecting as zombies etc. We set state to failed or scheduled directly from scheduler (rather than sending through the event buffer) because event buffer makes more sense for handling external events -- why round trip through the executor and back to scheduler when scheduler is initiating the action? Anyway this avoids having to deal with "state mismatch" issues when processing events. --------- Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
jscheffl
added a commit
that referenced
this pull request
Nov 19, 2024
…44158) * [v2-10-test] Re-queue tassk when they are stuck in queued (#43520) The old "stuck in queued" logic just failed the tasks. Now we requeue them. We accomplish this by revoking the task from executor and setting state to scheduled. We'll re-queue it up to 2 times. Number of times is configurable by hidden config. We added a method to base executor revoke_task because, it's a discrete operation that is required for this feature, and it might be useful in other cases e.g. when detecting as zombies etc. We set state to failed or scheduled directly from scheduler (rather than sending through the event buffer) because event buffer makes more sense for handling external events -- why round trip through the executor and back to scheduler when scheduler is initiating the action? Anyway this avoids having to deal with "state mismatch" issues when processing events. --------- (cherry picked from commit a41feeb) Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com> Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> * fix test_handle_stuck_queued_tasks_multiple_attempts (#44093) --------- Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com> Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Co-authored-by: GPK <gopidesupavan@gmail.com>
dstandish
added a commit
that referenced
this pull request
Nov 19, 2024
This is a fix up / followup to #43520 It does not really make a material difference, just, I'm avoiding use of the session decorator, and the create / dispose session logic, when it is not needed. i also commit as i go along since there's no reason to handle multiple distinct tis in the same transaction.
22 tasks
utkarsharma2
pushed a commit
that referenced
this pull request
Dec 4, 2024
…44158) * [v2-10-test] Re-queue tassk when they are stuck in queued (#43520) The old "stuck in queued" logic just failed the tasks. Now we requeue them. We accomplish this by revoking the task from executor and setting state to scheduled. We'll re-queue it up to 2 times. Number of times is configurable by hidden config. We added a method to base executor revoke_task because, it's a discrete operation that is required for this feature, and it might be useful in other cases e.g. when detecting as zombies etc. We set state to failed or scheduled directly from scheduler (rather than sending through the event buffer) because event buffer makes more sense for handling external events -- why round trip through the executor and back to scheduler when scheduler is initiating the action? Anyway this avoids having to deal with "state mismatch" issues when processing events. --------- (cherry picked from commit a41feeb) Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com> Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> * fix test_handle_stuck_queued_tasks_multiple_attempts (#44093) --------- Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com> Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Co-authored-by: GPK <gopidesupavan@gmail.com>
utkarsharma2
pushed a commit
that referenced
this pull request
Dec 9, 2024
…44158) * [v2-10-test] Re-queue tassk when they are stuck in queued (#43520) The old "stuck in queued" logic just failed the tasks. Now we requeue them. We accomplish this by revoking the task from executor and setting state to scheduled. We'll re-queue it up to 2 times. Number of times is configurable by hidden config. We added a method to base executor revoke_task because, it's a discrete operation that is required for this feature, and it might be useful in other cases e.g. when detecting as zombies etc. We set state to failed or scheduled directly from scheduler (rather than sending through the event buffer) because event buffer makes more sense for handling external events -- why round trip through the executor and back to scheduler when scheduler is initiating the action? Anyway this avoids having to deal with "state mismatch" issues when processing events. --------- (cherry picked from commit a41feeb) Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com> Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> * fix test_handle_stuck_queued_tasks_multiple_attempts (#44093) --------- Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com> Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Co-authored-by: GPK <gopidesupavan@gmail.com>
33 tasks
Closed
2 tasks
2 tasks
ashb
pushed a commit
that referenced
this pull request
Aug 12, 2025
In issue #51301, it was reported that failure callbacks do not run for task instances that get stuck in queued and fail in Airflow 2.10.5. This is happening due to the changes introduced in PR #43520 . In this PR, logic was introduced to requeue tasks that get stuck in queued (up to two times by default) before failing them. Previously, the executor's fail method was called when the task needed to be failed after max requeue attempts. This was replaced by the task instance's set_state method in the PR ti.set_state(TaskInstanceState.FAILED, session=session). Without the executor's fail method being called, failure callbacks will not be executed for such task instances. Therefore, I changed the code to call the executor's fail method instead in Airflow 3.
ashb
pushed a commit
that referenced
this pull request
Aug 12, 2025
…#53435) In issue #51301, it was reported that failure callbacks do not run for task instances that get stuck in queued and fail in Airflow 2.10.5. This is happening due to the changes introduced in PR #43520 . In this PR, logic was introduced to requeue tasks that get stuck in queued (up to two times by default) before failing them. Previously, the executor's fail method was called when the task needed to be failed after max requeue attempts. This was replaced by the task instance's set_state method in the PR ti.set_state(TaskInstanceState.FAILED, session=session). Without the executor's fail method being called, failure callbacks will not be executed for such task instances. Therefore, I changed the code to call the executor's fail method instead in Airflow 3. (cherry picked from commit 6da77b1) Co-authored-by: Karen Braganza <karenbraganza15@gmail.com>
ashb
pushed a commit
that referenced
this pull request
Aug 12, 2025
…#53435) In issue #51301, it was reported that failure callbacks do not run for task instances that get stuck in queued and fail in Airflow 2.10.5. This is happening due to the changes introduced in PR #43520 . In this PR, logic was introduced to requeue tasks that get stuck in queued (up to two times by default) before failing them. Previously, the executor's fail method was called when the task needed to be failed after max requeue attempts. This was replaced by the task instance's set_state method in the PR ti.set_state(TaskInstanceState.FAILED, session=session). Without the executor's fail method being called, failure callbacks will not be executed for such task instances. Therefore, I changed the code to call the executor's fail method instead in Airflow 3. (cherry picked from commit 6da77b1) Co-authored-by: Karen Braganza <karenbraganza15@gmail.com>
ashb
pushed a commit
that referenced
this pull request
Aug 13, 2025
…#53435) In issue #51301, it was reported that failure callbacks do not run for task instances that get stuck in queued and fail in Airflow 2.10.5. This is happening due to the changes introduced in PR #43520 . In this PR, logic was introduced to requeue tasks that get stuck in queued (up to two times by default) before failing them. Previously, the executor's fail method was called when the task needed to be failed after max requeue attempts. This was replaced by the task instance's set_state method in the PR ti.set_state(TaskInstanceState.FAILED, session=session). Without the executor's fail method being called, failure callbacks will not be executed for such task instances. Therefore, I changed the code to call the executor's fail method instead in Airflow 3. (cherry picked from commit 6da77b1) Co-authored-by: Karen Braganza <karenbraganza15@gmail.com>
kaxil
pushed a commit
that referenced
this pull request
Aug 13, 2025
…#53435) (#54401) In issue #51301, it was reported that failure callbacks do not run for task instances that get stuck in queued and fail in Airflow 2.10.5. This is happening due to the changes introduced in PR #43520 . In this PR, logic was introduced to requeue tasks that get stuck in queued (up to two times by default) before failing them. Previously, the executor's fail method was called when the task needed to be failed after max requeue attempts. This was replaced by the task instance's set_state method in the PR ti.set_state(TaskInstanceState.FAILED, session=session). Without the executor's fail method being called, failure callbacks will not be executed for such task instances. Therefore, I changed the code to call the executor's fail method instead in Airflow 3. (cherry picked from commit 6da77b1) Co-authored-by: Karen Braganza <karenbraganza15@gmail.com>
54 tasks
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses
track of a task, a cluster can't further scale up its workers, etc.), but tasks
should not be stuck in queued for a long time.
Originally, we simply marked a task as failed when it was stuck in queued for
too long. We found that this led to suboptimal outcomes as ideally we would like "failed"
to mean that a task was unable to run, instead of it meaning that we were unable to run the task.
As a compromise between always failing a stuck task and always rescheduling a stuck task (which could
lead to tasks being stuck in queued forever without informing the user), we have creating the config
[core] num_stuck_reschedules. With this new configuration, an airflow admin can decide howsensitive they would like their airflow to be WRT failing stuck tasks.
Here is an example of what it looks like after trying this out with celery executor