-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Avoid scheduler/parser manager deadlock by using non-blocking IO #15112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
/cc @uranusjr |
There have been long standing issues where the scheduler would "stop
responding" that we haven't been able to track down.
Someone was able to catch the scheduler in this state in 2.0.1 and
inspect it with py-spy (thanks, MatthewRBruce!)
The stack traces (slightly shortened) were:
```
Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
_send (multiprocessing/connection.py:368)
_send_bytes (multiprocessing/connection.py:411)
send (multiprocessing/connection.py:206)
send_callback_to_execute (airflow/utils/dag_processing.py:283)
_send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
_schedule_dag_run (airflow/jobs/scheduler_job.py:1762)
Process 77: airflow scheduler -- DagFileProcessorManager
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
_send (multiprocessing/connection.py:368)
_send_bytes (multiprocessing/connection.py:405)
send (multiprocessing/connection.py:206)
_run_parsing_loop (airflow/utils/dag_processing.py:698)
start (airflow/utils/dag_processing.py:596)
```
What this shows is that both processes are stuck trying to send data to
each other, but neither can proceed as both buffers are full, but since
both are trying to send, neither side is going to read and make more
space in the buffer. A classic deadlock!
The fix for this is two fold:
1) Enable non-blocking IO on the DagFileProcessorManager side.
The only thing the Manager sends back up the pipe is (now, as of 2.0)
the DagParsingStat object, and the scheduler will happily continue
without receiving these, so in the case of a blocking error, it is
simply better to ignore the error, continue the loop and try sending
one again later.
2) Reduce the size of DagParsingStat
In the case of a large number of dag files we included the path for
each and every one (in full) in _each_ parsing stat. Not only did the
scheduler do nothing with this field, meaning it was larger than it
needed to be, by making it such a large object, it increases the
likely hood of hitting this send-buffer-full deadlock case!
d57ce92 to
cfec563
Compare
airflow/utils/dag_processing.py
Outdated
| ) | ||
| self._signal_conn.send(dag_parsing_stat) | ||
| try: | ||
| self._signal_conn.send(dag_parsing_stat) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure: does applying a timeout context here makes sense?
The reason is it may try to send and get stuck but not fail explicitly. So to make it fail after specific timeout time may help.
May be a dumb question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are local sockets, with the change to Non-blocking mode, it will either succeed, or fail entirely, "instantly", so we don't need a timeout, no.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wouldn't be a bad idea to add a timeout, but that's extremely rarely an issue for local sockets (things need to get pretty wrong for that to happen at all). This can always be revisited if someone ever complains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On Linux these are AF_UNIX sockets too, so even harder to get it to not get instant fail/success in non-blocking mode I think
| self._signal_conn.send(dag_parsing_stat) | ||
| try: | ||
| self._signal_conn.send(dag_parsing_stat) | ||
| except BlockingIOError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any possibility of other error type that we may want to ignore? Again, may be a dumb question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, no. Anything else is an "error" condition, and we want to die (and the scheduler will notice and restart the manager process)
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
Co-authored-by: Xiaodong DENG <xd.deng.r@gmail.com>
) There have been long standing issues where the scheduler would "stop responding" that we haven't been able to track down. Someone was able to catch the scheduler in this state in 2.0.1 and inspect it with py-spy (thanks, MatthewRBruce!) The stack traces (slightly shortened) were: ``` Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread" _send (multiprocessing/connection.py:368) _send_bytes (multiprocessing/connection.py:411) send (multiprocessing/connection.py:206) send_callback_to_execute (airflow/utils/dag_processing.py:283) _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795) _schedule_dag_run (airflow/jobs/scheduler_job.py:1762) Process 77: airflow scheduler -- DagFileProcessorManager Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread" _send (multiprocessing/connection.py:368) _send_bytes (multiprocessing/connection.py:405) send (multiprocessing/connection.py:206) _run_parsing_loop (airflow/utils/dag_processing.py:698) start (airflow/utils/dag_processing.py:596) ``` What this shows is that both processes are stuck trying to send data to each other, but neither can proceed as both buffers are full, but since both are trying to send, neither side is going to read and make more space in the buffer. A classic deadlock! The fix for this is two fold: 1) Enable non-blocking IO on the DagFileProcessorManager side. The only thing the Manager sends back up the pipe is (now, as of 2.0) the DagParsingStat object, and the scheduler will happily continue without receiving these, so in the case of a blocking error, it is simply better to ignore the error, continue the loop and try sending one again later. 2) Reduce the size of DagParsingStat In the case of a large number of dag files we included the path for each and every one (in full) in _each_ parsing stat. Not only did the scheduler do nothing with this field, meaning it was larger than it needed to be, by making it such a large object, it increases the likely hood of hitting this send-buffer-full deadlock case! (cherry picked from commit b0e68eb)
…che#15112) There have been long standing issues where the scheduler would "stop responding" that we haven't been able to track down. Someone was able to catch the scheduler in this state in 2.0.1 and inspect it with py-spy (thanks, MatthewRBruce!) The stack traces (slightly shortened) were: ``` Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread" _send (multiprocessing/connection.py:368) _send_bytes (multiprocessing/connection.py:411) send (multiprocessing/connection.py:206) send_callback_to_execute (airflow/utils/dag_processing.py:283) _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795) _schedule_dag_run (airflow/jobs/scheduler_job.py:1762) Process 77: airflow scheduler -- DagFileProcessorManager Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread" _send (multiprocessing/connection.py:368) _send_bytes (multiprocessing/connection.py:405) send (multiprocessing/connection.py:206) _run_parsing_loop (airflow/utils/dag_processing.py:698) start (airflow/utils/dag_processing.py:596) ``` What this shows is that both processes are stuck trying to send data to each other, but neither can proceed as both buffers are full, but since both are trying to send, neither side is going to read and make more space in the buffer. A classic deadlock! The fix for this is two fold: 1) Enable non-blocking IO on the DagFileProcessorManager side. The only thing the Manager sends back up the pipe is (now, as of 2.0) the DagParsingStat object, and the scheduler will happily continue without receiving these, so in the case of a blocking error, it is simply better to ignore the error, continue the loop and try sending one again later. 2) Reduce the size of DagParsingStat In the case of a large number of dag files we included the path for each and every one (in full) in _each_ parsing stat. Not only did the scheduler do nothing with this field, meaning it was larger than it needed to be, by making it such a large object, it increases the likely hood of hitting this send-buffer-full deadlock case! (cherry picked from commit b0e68eb)
) There have been long standing issues where the scheduler would "stop responding" that we haven't been able to track down. Someone was able to catch the scheduler in this state in 2.0.1 and inspect it with py-spy (thanks, MatthewRBruce!) The stack traces (slightly shortened) were: ``` Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread" _send (multiprocessing/connection.py:368) _send_bytes (multiprocessing/connection.py:411) send (multiprocessing/connection.py:206) send_callback_to_execute (airflow/utils/dag_processing.py:283) _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795) _schedule_dag_run (airflow/jobs/scheduler_job.py:1762) Process 77: airflow scheduler -- DagFileProcessorManager Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread" _send (multiprocessing/connection.py:368) _send_bytes (multiprocessing/connection.py:405) send (multiprocessing/connection.py:206) _run_parsing_loop (airflow/utils/dag_processing.py:698) start (airflow/utils/dag_processing.py:596) ``` What this shows is that both processes are stuck trying to send data to each other, but neither can proceed as both buffers are full, but since both are trying to send, neither side is going to read and make more space in the buffer. A classic deadlock! The fix for this is two fold: 1) Enable non-blocking IO on the DagFileProcessorManager side. The only thing the Manager sends back up the pipe is (now, as of 2.0) the DagParsingStat object, and the scheduler will happily continue without receiving these, so in the case of a blocking error, it is simply better to ignore the error, continue the loop and try sending one again later. 2) Reduce the size of DagParsingStat In the case of a large number of dag files we included the path for each and every one (in full) in _each_ parsing stat. Not only did the scheduler do nothing with this field, meaning it was larger than it needed to be, by making it such a large object, it increases the likely hood of hitting this send-buffer-full deadlock case! (cherry picked from commit b0e68eb)
Closes #7935, #15037 (I hope!)
Thanks to @uranusjr for making the draft PR that pointed me to this solution in the first place.
There have been long standing issues where the scheduler would "stop
responding" that we haven't been able to track down.
Someone was able to catch the scheduler in this state in 2.0.1 and inspect
it with py-spy (thanks, MatthewRBruce!)
The stack traces (slightly shortened) were:
What this shows is that both processes are stuck trying to send data to
each other, but neither can proceed as both buffers are full, but since
both are trying to send, neither side is going to read and make more space
in the buffer. A classic deadlock!
The fix for this is two fold:
Enable non-blocking IO on the DagFileProcessorManager side.
The only thing the Manager sends back up the pipe is (now, as of 2.0)
the DagParsingStat object, and the scheduler will happily continue
without receiving these, so in the case of a blocking error, it is
simply better to ignore the error, continue the loop and try sending
one again later.
Reduce the size of DagParsingStat
In the case of a large number of dag files we included the path for
each and every one (in full) in each parsing stat. Not only did the
scheduler do nothing with this field, meaning it was larger than it
needed to be, by making it such a large object, it increases the
likely hood of hitting this send-buffer-full deadlock case!
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.