Skip to content

Conversation

@AutomationDev85
Copy link
Contributor

@AutomationDev85 AutomationDev85 commented Apr 18, 2023

Hi airflow community,
this is my first PR and be happy to work on the scheduler runtime. We faced an issue with slow scheduler execution time by having millions of queued dag_runs for one DAG. This is the first PR and more is in the queue.

This PR will add .all() to query to match the pydantic definition of function and return only list of dag_runs. This optimize the scheduler runtime because without this change the query is executed 2 times in function _start_queued_dagruns in airflow/jobs/scheduler_job_runner.py. So this saves execution time in the scheduler.

@vandonr-amz fyi, as discussed with @jens-scheffler-bosch

@boring-cyborg
Copy link

boring-cyborg bot commented Apr 18, 2023

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@vandonr-amz
Copy link
Contributor

Nice small change with a big impact :)
I'm not familiar with SQLAlchemy, so I'll rephrase my comprehension of your change here to make sure I understand well what you're doing (and tell me if I'm wrong):
The method next_dagruns_to_examine claimed to return a List here
https://github.com/aws-mwaa/upstream-to-airflow/blob/1ebeb19bf7542850fff2f1e2f9795ad70c1b24e2/airflow/models/dagrun.py#L294
but this type annotation was wrong, as it was returning a query, which was lazily ran every time it was iterated.
So calling the method once but iterating twice on the results resulted in the query being executed twice.

Adding this .all() forces evaluation on the spot and actually returns a List.

Did I get this right ?

Did you measure the improvement brought by this PR ? If so, how ? Do you have any result to share ?

@jscheffl
Copy link
Contributor

Nice small change with a big impact :) I'm not familiar with SQLAlchemy, so I'll rephrase my comprehension of your change here to make sure I understand well what you're doing (and tell me if I'm wrong): The method next_dagruns_to_examine claimed to return a List here https://github.com/aws-mwaa/upstream-to-airflow/blob/1ebeb19bf7542850fff2f1e2f9795ad70c1b24e2/airflow/models/dagrun.py#L294 but this type annotation was wrong, as it was returning a query, which was lazily ran every time it was iterated. So calling the method once but iterating twice on the results resulted in the query being executed twice.

You are right. Annotation is correct though, a list is returned but it is lazily evaluated by SQLAlchemy.

Adding this .all() forces evaluation on the spot and actually returns a List.

Correct.
The optimization is in the the usage of https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L1326 whereas in lines 1329+1351 two times an iteration is made over the list. As it is lazy evaluated the query is executed two times. But actually the code wants to loop two times over the (static) list.

Did you measure the improvement brought by this PR ? If so, how ? Do you have any result to share ?

It is an improvement depending n your DAG queue length and DB query performance. Together with/before the other PR we had this query running for 5-15 seconds times two. Besides (another PR will do this) the query is in some cases sub-optimal in our scheduler loop we immediately saved 50% of time in this section == 5-15 seconds per scheduler loop.
But if the queue is not too long, the query will take 50-100ms, then you still save 50% of DB efforts :-D

Copy link
Contributor

@vandonr-amz vandonr-amz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thank you for the detailed explanation :)
(non binding) LGTM 👍

Copy link
Contributor Author

@AutomationDev85 AutomationDev85 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has any one an idea why this failed in the CI:
ERROR tests/utils/test_db_cleanup.py::TestDBCleanup::test__cleanup_table[middle]
ERROR tests/utils/test_db_cleanup.py::TestDBCleanup::test__cleanup_table[end_exactly]
Is this flaky in the CI? I do not think this has something to do with the changes of the PR. How is it possible to trigger the CI run again?

@potiuk
Copy link
Member

potiuk commented Apr 20, 2023

I re-run it. Yes. We have a few flaky tests (we try to keep them down as much as possible but eventually it's the matter of probability it will happen - when they are happening in 1/ 500 runs or so, chances they will get solved are low because reproducibility is low. But usually when it fails in one job only and is fine in the others, it means they are flaky ones.

Luckily we can re-run just the failed job when they fail - this is what I did. (BTW. We might simply apply a flake plugin for those kind of tests in the near future). This is the next improvement I have on my list.

@eladkal eladkal added this to the Airflow 2.6.1 milestone Apr 28, 2023
@uranusjr
Copy link
Member

uranusjr commented May 15, 2023

(Todo for self: Code around where this function is called can use quite some typing improvements and optimisations using lazy iterators after this one is merged.)

@uranusjr
Copy link
Member

Need to fix tests

@pierrejeambrun
Copy link
Member

pierrejeambrun commented May 24, 2023

Relaunching failed static check, weird unrelated error on open-api-linter.

edit: Ok I see we have this problem on multiple PRs right now, will most probably fail again until we find a fix. (I believe uranusjr is working on that)

edit: #31518 should have solved that, can you rebase and try again ?

@AutomationDev85 AutomationDev85 force-pushed the feature/optimise-scheduler-run-time-1 branch from c7a4702 to e5694f0 Compare May 25, 2023 12:16
@pierrejeambrun pierrejeambrun merged commit 0fd42ff into apache:main May 25, 2023
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Jun 8, 2023
eladkal pushed a commit that referenced this pull request Jun 8, 2023
* Function returns list of dagruns and not query

* Changed pytests

* Changed all to _start_queued_dagruns

* Added comment and fixed tests

* Fixed typo

(cherry picked from commit 0fd42ff)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants