-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix triggerer deadlocks #51279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix triggerer deadlocks #51279
Conversation
|
|
|
The future pass itself looks right, however, here you still need to use the mentioned lock type, which I described in the linked comment. The approach without synchronization is appropriate only in case of full use of futures - when each I can write a separate lock implementation later if you would like. |
ah you mean lock here before threadpool https://github.com/apache/airflow/pull/51279/files#diff-e4cc497f1c786d142ce4c930f43e33b0bb4b53d375d274278fa82f4d5567608aR963 ? ? |
Yes, and in The thread-level lock approach is special in that all uses of the lock remain, but the lock itself changes, special handling for async -> sync is added. Synchronization is still needed to eliminate collisions. |
Yeah you correct, i just ran with some multiple dags i could see difference, without lock :) |
Multithreaded issues are usually hard to reproduce - it is often much easier to take a formal approach to them. This is why I would advise not to trust tests, at least not specialized ones - they can lie. |
Yeah agree :) |
7e108a1 to
8a9433c
Compare
| yield TriggerEvent({"count": dag_run_states_count, "dag_run_state": dag_run_state}) | ||
|
|
||
|
|
||
| @pytest.mark.xfail( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests are passing now, xfail not required .
471110a to
34d1ba3
Compare
34d1ba3 to
3701d04
Compare
| async def connect_stdin() -> asyncio.StreamReader: | ||
| reader = asyncio.StreamReader() | ||
| protocol = asyncio.StreamReaderProtocol(reader) | ||
| await loop.connect_read_pipe(lambda: protocol, sys.stdin) | ||
| return reader | ||
|
|
||
| self.response_sock = await connect_stdin() | ||
|
|
||
| line = await self.response_sock.readline() | ||
| msg = comms_decoder.get_message() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sys.stdin is already configured to comms here https://github.com/apache/airflow/pull/51279/files#diff-e4cc497f1c786d142ce4c930f43e33b0bb4b53d375d274278fa82f4d5567608aR805.
I think its fine to read from get_message?
| global TRIGGERER_SUPERVISOR_COMMS_FUTURE | ||
| line = None | ||
|
|
||
| if TRIGGERER_SUPERVISOR_COMMS_FUTURE is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't love this being in here. It feels like a massive abstraction leak. I think we should instead subclass CommsDecoder into a new class defined/living somewhere with the triggerer code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, happy to do with subclass..
|
@gopidesupavan Can you explain your reason/thinking for switching to a thread pool? Generally I don't love the use of a threadpool in an async context, especially when we are just making requests (i.e. something asyncio should be really good at), so I'd really like to understand more about why this change was needed. |
Let me try to explain, since I was the initiator of this change. The problem is that synchronous and asynchronous lock calls can coexist in an asynchronous context. When an asynchronous task, holding the lock asynchronously, switches contexts, another task may try to acquire the lock synchronously (for some other request). The result is a deadlock - the attempt to acquire the lock synchronously cannot complete until the asynchronous task completes, and the asynchronous task cannot complete because the event loop is blocked by the synchronous call. ThreadPoolExecutor allows to delegate the first (asynchronous) call to a worker thread, and as a result it will be able to complete without switching to the asynchronous task, which will allow to bypass deadlock. Calling There are two cleaner solutions. The first one is to use ThreadPoolExecutor for each I will also clarify that this PR is incomplete without using a more specific type of lock that allows the async -> sync case (synchronous acquiring after asynchronous one in the same thread). |
|
Also note that it is possible to use |
|
In general, it is impossible to solve the synchronization problem between synchronous and asynchronous code in the same thread when synchronous code refers to threading, due to the specifics of cooperative multitasking implementation. Synchronous calls will always block the event loop, and the blocked event loop will prevent asynchronous tasks from executing that could have completed these synchronous calls. Turning synchronous calls into implicitly asynchronous ones (eventlet and gevent approach) leads to coroutine-safety violation. So the solutions are either to reduce this type of synchronization or to delegate execution to a worker thread. |
|
@x42005e1f Thanks for the response :). @ashb is that looks fine ? you have any suggestions are alternatives for this please? |

The triggers getting deadlock when using sync functions with sync_to_async. To avoid that we have couple of solutions discussed in here #50185.
Use the ThreadPoolExecutor to read trigger workloads and the future object will be used to wait in get_message, this will we can avoid collisions as described here #50185 (comment)
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.