Skip to content

Conversation

@tanelk
Copy link
Contributor

@tanelk tanelk commented Nov 22, 2021

closes #19622

The scheduler_job can get stuck in a state, where it is not able to queue new tasks. It will get out of this state on its own, but the time taken depends on the runtime of current tasks - this could be several hours or even days.

If the scheduler can't queue any tasks because of different concurrency limits (per pool, dag or task), then on next iterations of the scheduler loop it will try to queue the same tasks. Meanwhile there could be some scheduled tasks with lower priority_weight that could be queued, but they will remain waiting.

The proposed solution is to keep track of dag and task ids, that are concurrecy limited and then repeat the query with these dags and tasks filtered out.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Nov 22, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 22, 2021

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, but corrected a variable name, that caused some confusion

ashb
ashb previously requested changes Nov 22, 2021
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This offset approach won't work well with multiple schedulers running

@tanelk tanelk requested a review from ashb November 23, 2021 06:49
@tanelk tanelk changed the title Fix Tasks get stuck in scheduled state Fix Tasks getting stuck in scheduled state Nov 23, 2021
@vapiravfif
Copy link
Contributor

Apologies if this comment won't help the discussion, but in my opinion the better approach could may be something being done around the "limit" clause -
query = query.limit(max_tis) (origin)
and
max_tis = min(self.max_tis_per_query, self.executor.slots_available) (origin)

Because it seems that the assumption that it is a better to limit query of tasks waiting to be queued by potentially available slots might not be the best practice in some cases.

@tanelk
Copy link
Contributor Author

tanelk commented Nov 23, 2021

Apologies if this comment won't help the discussion, but in my opinion the better approach could may be something being done around the "limit" clause - query = query.limit(max_tis) (origin) and max_tis = min(self.max_tis_per_query, self.executor.slots_available) (origin)

Because it seems that the assumption that it is a better to limit query of tasks waiting to be queued by potentially available slots might not be the best practice in some cases.

Yes reducing the value of max_tis will make this situation worse, but removing the max_tis = min(...) does not guarantee that the issue will get solved.

As long as we are using any limit value, there is a chance that none of the selected task instancess can be queued but there are some task instances beyond the limit that could have been scheduled.

I can think of 3 possible solutions, that will never cause the scheduler to get stuck:

  1. Remove the limit - could risk memory issues on a large airflow installation (many dags and tasks)
  2. Build all the concurrency filters into the SQL query - the task instance limit seems to be impossible/very difficult in the current data model.
  3. Some sort of iterative approach to look at the task instances further down - this is proposed in this PR.

@tanelk
Copy link
Contributor Author

tanelk commented Nov 23, 2021

This offset approach won't work well with multiple schedulers running

@ashb I now reworked the method

@tanelk tanelk force-pushed the 19622_tasks_stuck_in_scheduled branch from 2619de4 to c4b9a54 Compare November 26, 2021 08:48
@ashb
Copy link
Member

ashb commented Nov 26, 2021

A lot more work this way, but I think this looks good. I'll need to take a look at this again with fresher eyes next week.

How much/where have you tested this change?

Tanel Kiis added 2 commits December 13, 2021 11:36
# Conflicts:
#	airflow/jobs/scheduler_job.py
@kaxil kaxil force-pushed the 19622_tasks_stuck_in_scheduled branch 2 times, most recently from 854ed2f to 95e9dff Compare December 17, 2021 20:15
Address comments

Fix flacky test

Update test_scheduler_job.py
@kaxil kaxil force-pushed the 19622_tasks_stuck_in_scheduled branch from 95e9dff to 055fff7 Compare December 17, 2021 20:38
@kaxil
Copy link
Member

kaxil commented Dec 17, 2021

Just rebased on latest main branch to fix merge conflicts

@ephraimbuddy
Copy link
Contributor

@tanelk please rebase. I guess the failing tests have been resolved in main

Copy link
Member

@jedcunningham jedcunningham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely need another committer to take a look at this as well.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Mar 22, 2022
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Mar 22, 2022
@ephraimbuddy ephraimbuddy reopened this Mar 22, 2022
@ephraimbuddy ephraimbuddy merged commit cd68540 into apache:main Mar 22, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 22, 2022

Awesome work, congrats on your first merged pull request!

ephraimbuddy pushed a commit that referenced this pull request Mar 22, 2022
The scheduler_job can get stuck in a state, where it is not able to queue new tasks. It will get out of this state on its own, but the time taken depends on the runtime of current tasks - this could be several hours or even days.

If the scheduler can't queue any tasks because of different concurrency limits (per pool, dag or task), then on next iterations of the scheduler loop it will try to queue the same tasks. Meanwhile there could be some scheduled tasks with lower priority_weight that could be queued, but they will remain waiting.

The proposed solution is to keep track of dag and task ids, that are concurrecy limited and then repeat the query with these dags and tasks filtered out.

Co-authored-by: Tanel Kiis <tanel.kiis@reach-u.com>
(cherry picked from commit cd68540)
ephraimbuddy pushed a commit that referenced this pull request Mar 22, 2022
The scheduler_job can get stuck in a state, where it is not able to queue new tasks. It will get out of this state on its own, but the time taken depends on the runtime of current tasks - this could be several hours or even days.

If the scheduler can't queue any tasks because of different concurrency limits (per pool, dag or task), then on next iterations of the scheduler loop it will try to queue the same tasks. Meanwhile there could be some scheduled tasks with lower priority_weight that could be queued, but they will remain waiting.

The proposed solution is to keep track of dag and task ids, that are concurrecy limited and then repeat the query with these dags and tasks filtered out.

Co-authored-by: Tanel Kiis <tanel.kiis@reach-u.com>
(cherry picked from commit cd68540)
@jedcunningham
Copy link
Member

Thanks @tanelk! Congrats on your first commit 🎉

@tanelk tanelk deleted the 19622_tasks_stuck_in_scheduled branch March 23, 2022 16:10
ephraimbuddy pushed a commit that referenced this pull request Mar 23, 2022
The scheduler_job can get stuck in a state, where it is not able to queue new tasks. It will get out of this state on its own, but the time taken depends on the runtime of current tasks - this could be several hours or even days.

If the scheduler can't queue any tasks because of different concurrency limits (per pool, dag or task), then on next iterations of the scheduler loop it will try to queue the same tasks. Meanwhile there could be some scheduled tasks with lower priority_weight that could be queued, but they will remain waiting.

The proposed solution is to keep track of dag and task ids, that are concurrecy limited and then repeat the query with these dags and tasks filtered out.

Co-authored-by: Tanel Kiis <tanel.kiis@reach-u.com>
(cherry picked from commit cd68540)
ephraimbuddy pushed a commit that referenced this pull request Mar 24, 2022
The scheduler_job can get stuck in a state, where it is not able to queue new tasks. It will get out of this state on its own, but the time taken depends on the runtime of current tasks - this could be several hours or even days.

If the scheduler can't queue any tasks because of different concurrency limits (per pool, dag or task), then on next iterations of the scheduler loop it will try to queue the same tasks. Meanwhile there could be some scheduled tasks with lower priority_weight that could be queued, but they will remain waiting.

The proposed solution is to keep track of dag and task ids, that are concurrecy limited and then repeat the query with these dags and tasks filtered out.

Co-authored-by: Tanel Kiis <tanel.kiis@reach-u.com>
(cherry picked from commit cd68540)
ephraimbuddy pushed a commit that referenced this pull request Mar 26, 2022
The scheduler_job can get stuck in a state, where it is not able to queue new tasks. It will get out of this state on its own, but the time taken depends on the runtime of current tasks - this could be several hours or even days.

If the scheduler can't queue any tasks because of different concurrency limits (per pool, dag or task), then on next iterations of the scheduler loop it will try to queue the same tasks. Meanwhile there could be some scheduled tasks with lower priority_weight that could be queued, but they will remain waiting.

The proposed solution is to keep track of dag and task ids, that are concurrecy limited and then repeat the query with these dags and tasks filtered out.

Co-authored-by: Tanel Kiis <tanel.kiis@reach-u.com>
(cherry picked from commit cd68540)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler full tests needed We need to run full set of tests for this PR to merge type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Tasks get stuck in "scheduled" state and starved when dags with huge amount of tasks is scheduled

7 participants