-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
main (development)
Operating System
Unix with 'spawn' method, MacOS, Windows (spawn is default there)
Versions of Apache Airflow Providers
Not relevant
Deployment
Other
Deployment details
Any deployment with 'spawn' mutliprocessing mode method used is susceptible to this problem. This is configured via (undocumented) feature on Linux CORE__MP_START_METHOD="spawn" but this is also default method for Windows and MacOS envieronments.
What happened
Whenever the DagFileProcessor performs relod of airflow settings (always when it starts), it also re-initializes SQLAlchemy/ORM engine. This reinitialization in spawn mode of multiprocessing has unintended side effect - it corrupts the SQLAlchemy session data in rather unpredictable ways. In our CI tests it resulted with making objects created after the reload unavailable to the session that queried for those objects.
This is particularly disruptive as the "parent" process for DagProcessor manager currently is Scheduler, which heavlly relies on lots of ORM queries and any corruption of the session data might lead to extremely difficult to diagnose and debug problems, especially that such reload might happen asynchronously.
Note - since the only "production" deployment of Airflow is Linux, and this feature is undocumented, it is highly unlikely this problem will cause a problem in production. However in our CI environment and in environments where people use Airflow for development, it is quite likely to happen. This might already cause stability issues for MacOS users (and in the future Windows users) who run Airlfow locally to run tests rather than use Airflow in Production. While this is lower priority, it's still big enough and important group of users so that this problem should be fixed.
What you expected to happen
We expect the "reload" is not happening - and specifically that the ORM engine will not be reinitialized when spawned DagProcesorManager starts. This is only needed currently because the logging configuration needs to be recreated, specifically for DAG processor.
Note: In the future, when dag processor is separated to a separate process/command line (planned for the multi-tenant work), this will not be a problem. But the mode where DAG processor remains child process of Airflow is likely to stay, so we should fix it.
How to reproduce
- Checkout the code of Airflow from 30 November 2021
- Add
sleep(2)command to initialization of the_run_processor_managerinairflow/dag_processing/manager.py
@staticmethod
def _run_processor_manager(
dag_directory: str,
max_runs: int,
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
dag_ids: Optional[List[str]],
pickle_dags: bool,
async_mode: bool,
) -> None:
time.sleep(2) # <-- add the liine here
# Make this process start as a new process group - that makes it easy
# to kill all sub-process of this at the OS-level, rather than having
# to iterate the child processes
os.setpgid(0, 0)
- Add
sleep(10) command in thetest_scheduler_keeps_scheduling_pool_fulloftests/jobs/test_scheduler_job.py`
# Create 5 dagruns for each DAG.
# To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all
# TIs from the first dag first.
for dr in _create_dagruns(dag_d1):
scheduler._schedule_dag_run(dr, session)
time.sleep(10) # <- Add sleep command here
for dr in _create_dagruns(dag_d2):
scheduler._schedule_dag_run(dr, session)
Run pytest tests in the following way:
pytest tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_multiprocessing_with_spawn_method tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_keeps_scheduling_pool_full -s
Result:
In the logs you will also see the below, which is the manifestation of session being broken. The object were added few lines before, but since they were added already after the reloaad hae
[2021-12-01 13:08:52,842] {scheduler_job.py:1009} ERROR - Couldn't find dag test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-12-01 13:08:52,848] {scheduler_job.py:1009} ERROR - Couldn't find dag test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-12-01 13:08:52,854] {scheduler_job.py:1009} ERROR - Couldn't find dag test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-12-01 13:08:52,861] {scheduler_job.py:1009} ERROR - Couldn't find dag test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-12-01 13:08:52,867] {scheduler_job.py:1009} ERROR - Couldn't find dag test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
What happens here:
- the code in Aiflow before December 2021 had a bug (fixed at the beginning of December) where dag file processor has not been killed if it was slow to start (race condition)
- by adding sleep() command we are forcing the race condition to be reproducible
- the first test
test_scheduler_multiprocessing_with_spawn_methodfinished, but due to the race condition DAGProcessor has not been terminated when the test finished - the second test started but paused for longer between two "create dagrun" loops
- the first dagrun completed before reload
- the reload happened in the spawned process (after 2s)
- the second dagrun completed after the reload (after 10s)
- The dagruns created in the second loop were missing when the subsequent code in scheduler job tried to retrieve them. the dagruns created before - were accessible.
Anything else
Very long investigation, but I am happy we found it. The fix is coming to the "race condition" and the "spawn" tests were moved to where they belong (dag_processor) with some more test cases added: #19935. However further changes are needed to solve the root cause - reload of the ORM should not happen in the spawned process.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
