Skip to content

Conversation

@ashb
Copy link
Member

@ashb ashb commented Jun 13, 2025

Relates to #46426

The existing JSON Lines based approach had three major drawbacks

  1. In the case of really large lines (in the region of 10 or 20MB) the python
    line buffering could sometimes result in a partial read
  2. The JSON based approach didn't have the ability to add any metadata (such
    as errors).
  3. Not every message type/call-site waited for a response, which meant those
    client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single send() that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of socketpair and save file
handles. This also happens to help the run_as_user feature which is
currently broken, as without extra config to sudoers file, sudo will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
on_close callback to be part of the object we store in the selector object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of (on_read, on_close) and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have send on
TriggerCommsDecoder "go back" to the async even loop via async_to_sync, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change.

Fixes #50185, fixes #51213, closes #51279

@ashb
Copy link
Member Author

ashb commented Jun 13, 2025

@gopidesupavan I took your example test dags external_task_sensor_parent and external_task_sensor_child_dag (I can't find which issue you put them in anymore) and it was able to run the catch up and have all 3 years of daily dags run fine without deadlock.

I'm not 100% sure I've removed the deadlock (such is the issue with multi-threading) but I can no longer reproduce it where as previously it would lock after a few seconds.

In short this appears to help the problem, but all of the discussion in the issue about this makes me think that it shouldn't fix it.

@ashb ashb force-pushed the rework-tasksdk-supervisor-comms-protocol branch from 1322bb6 to 22a44cb Compare June 13, 2025 16:48
@x42005e1f
Copy link

x42005e1f commented Jun 13, 2025

In short this appears to help the problem, but all of the discussion in the issue about this makes me think that it shouldn't fix it.

I think the same way. If async_to_sync() is called in the same thread as the event loop (which was a deadlock condition), it will raise RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. The problem may not show up either because of insufficient test coverage or because this PR eliminates the situation where synchronous and asynchronous send()'s (TriggerCommsDecoder methods) are mixed in the same thread.

@ashb
Copy link
Member Author

ashb commented Jun 16, 2025

Ahhh I know why it helps then.

The main thread/outer is the async event loop. It calls in to get_connection via sync_to_async so it is running in a thread. The new change in this PR now means that TriggerCommsDecoder calls async_to_sync(self.asend) -- which then "bubbles up" to the main thread, and it is now only the async code that locks things, so the sync thread is never trying to achieve the lock directly.

@ashb ashb force-pushed the rework-tasksdk-supervisor-comms-protocol branch from 22a44cb to 4975725 Compare June 16, 2025 13:25
@ashb ashb force-pushed the rework-tasksdk-supervisor-comms-protocol branch from a4963a5 to bad9e3a Compare June 16, 2025 15:32
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

@kaxil kaxil added this to the Airflow 3.0.3 milestone Jun 16, 2025
@ashb ashb force-pushed the rework-tasksdk-supervisor-comms-protocol branch 2 times, most recently from 0f38c3b to f45647a Compare June 16, 2025 21:25
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1.

github-actions bot pushed a commit that referenced this pull request Jun 18, 2025
This should have been handled in #51699 but was missed there as it is very
infrequently hit.
(cherry picked from commit b8b7f4a)

Co-authored-by: Ash Berlin-Taylor <ash@apache.org>
github-actions bot pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jun 18, 2025
…51885)

This should have been handled in apache#51699 but was missed there as it is very
infrequently hit.
(cherry picked from commit b8b7f4a)

Co-authored-by: Ash Berlin-Taylor <ash@apache.org>
jedcunningham pushed a commit that referenced this pull request Jun 18, 2025
…#51895)

This should have been handled in #51699 but was missed there as it is very
infrequently hit.
(cherry picked from commit b8b7f4a)

Co-authored-by: Ash Berlin-Taylor <ash@apache.org>
ashb added a commit that referenced this pull request Jun 19, 2025
…#51699)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.

(cherry picked from commit 492518e)
ashb added a commit that referenced this pull request Jun 19, 2025
…#51699)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.

(cherry picked from commit 492518e)
ashb added a commit that referenced this pull request Jun 19, 2025
…#51699)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.

(cherry picked from commit 492518e)
ashb added a commit that referenced this pull request Jun 19, 2025
…#51699)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.

(cherry picked from commit 492518e)
ashb added a commit that referenced this pull request Jun 20, 2025
…#51699)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.

(cherry picked from commit 492518e)
ashb added a commit that referenced this pull request Jun 20, 2025
…#51699)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.

(cherry picked from commit 492518e)
except Exception:
log.exception("Unable to decode message", line=line)
log.exception("Unable to decode message", body=request.body)
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we are continuing? i think if we unable to decode we should respond ?

@gopidesupavan
Copy link
Member

Thanks for the rework, and updating triggers.

@gopidesupavan
Copy link
Member

Ahhh I know why it helps then.

The main thread/outer is the async event loop. It calls in to get_connection via sync_to_async so it is running in a thread. The new change in this PR now means that TriggerCommsDecoder calls async_to_sync(self.asend) -- which then "bubbles up" to the main thread, and it is now only the async code that locks things, so the sync thread is never trying to achieve the lock directly.

Yes exactly..

RoyLee1224 pushed a commit to RoyLee1224/airflow that referenced this pull request Jun 21, 2025
…apache#51699)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.
RoyLee1224 pushed a commit to RoyLee1224/airflow that referenced this pull request Jun 21, 2025
This should have been handled in apache#51699 but was missed there as it is very
infrequently hit.
ashb added a commit that referenced this pull request Jun 23, 2025
…gth-prefixed (#51699)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.

(cherry picked from commit 492518e)
ashb added a commit that referenced this pull request Jun 23, 2025
…gth-prefixed (#51699)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.

(cherry picked from commit 492518e)
ashb added a commit that referenced this pull request Jun 23, 2025
…gth-prefixed (#51699) (#51924)

* Switch the Supervisor/task process from line-based to length-prefixed

The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change

* Deal with compat in tests

This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.

(cherry picked from commit 492518e)
Lee-W added a commit to astronomer/airflow that referenced this pull request Jun 23, 2025
Lee-W added a commit that referenced this pull request Jun 23, 2025
@kaxil
Copy link
Member

kaxil commented Jun 25, 2025

#protm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Airflow DAG trigger with Amazon SQS Asset watcher trigger not working properly Trigger runner process locked with multiple Workflow triggers

6 participants