Skip to content

Conversation

@ashb
Copy link
Member

@ashb ashb commented Jan 10, 2025

When I added the DagFileProcessorProcess I did some naughty things with
subclassing and lying to the type checker, and this is now stopping us easily
adding DAG bundles (because as it is structured right now we would have to
change both parsing and execution at the same time, or make the type checker
even more unhappy.)

This more correctly separates the two classes -- essentially anything that
used self.client couldn't have been called from a DagFileProcessorProcess
(as client was always None for those instances).

This PR fixes it by adding a new ActivitySubprocess which is the type used
at Execution time (the one that always has the client) and the base behaviour
kept in WatchedSubprocess.

Closes #45537 as it is an alternative implementation of addressing the same problem


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:Scheduler including HA (high availability) scheduler area:task-sdk labels Jan 10, 2025
self.parsing_result = msg
return
# GetVariable etc -- parsing a dag can run top level code that asks for an Airflow Variable
super()._handle_request(msg, log)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would have never worked, it would have tried to use self.client which is None and would have thrown an exception

)

# Tell the task process what it needs to do!
proc._on_child_started(what, path, requests_fd)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this is now the responsibility of the subclasses overridden start methods


self._handle_request(msg, log)

def _handle_request(self, msg, log: FilteringBoundLogger) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msg here is not typed in the base class on purpose.

When I added the DagFileProcessorProcess I did some naughty things with
subclassing and lying to the type checker, and this is now stopping us easily
adding DAG bundles (because as it is structured right now we would have to
change both parsing and execution at the same time, or make the type checker
_even more unhappy_.)

This more correctly separates the two classes -- essentially anything that
used `self.client` couldn't have been called from a DagFileProcessorProcess
(as client was always None for those instances).

This PR fixes it by adding a new `ActivitySubprocess` which is the type used
at Execution time (the one that always has the client) and the base behaviour
kept in WatchedSubprocess.
@ashb ashb force-pushed the watchprocess-subclasses branch from 83c099e to bfd88fb Compare January 10, 2025 22:04
@ashb
Copy link
Member Author

ashb commented Jan 10, 2025

Tests are failing with this:

FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[backfill-failed] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[backfill-success] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[backfill-queued] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[scheduled-failed] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[scheduled-success] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[scheduled-queued] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[manual-failed] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[manual-success] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[manual-queued] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[asset_triggered-failed] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[asset_triggered-success] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'
FAILED tests/api_connexion/endpoints/test_dag_run_endpoint.py::TestPatchDagRunState::test_action_logging[asset_triggered-queued] - TypeError: bag_dag() got an unexpected keyword argument 'root_dag'

which I think main is suffering from too.

@ashb
Copy link
Member Author

ashb commented Jan 10, 2025

Yup, issue is unrelated, fix is #45572.

Merging this

@ashb ashb merged commit a2d5a25 into apache:main Jan 10, 2025
42 of 46 checks passed
@ashb ashb deleted the watchprocess-subclasses branch January 10, 2025 22:51
kaxil added a commit to astronomer/airflow that referenced this pull request Jan 11, 2025
Follow-up of apache#45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`
kaxil added a commit that referenced this pull request Jan 11, 2025
Follow-up of #45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`
karenbraganz pushed a commit to karenbraganz/airflow that referenced this pull request Jan 13, 2025
When I added the DagFileProcessorProcess I did some naughty things with
subclassing and lying to the type checker, and this is now stopping us easily
adding DAG bundles (because as it is structured right now we would have to
change both parsing and execution at the same time, or make the type checker
_even more unhappy_.)

This more correctly separates the two classes -- essentially anything that
used `self.client` couldn't have been called from a DagFileProcessorProcess
(as client was always None for those instances).

This PR fixes it by adding a new `ActivitySubprocess` which is the type used
at Execution time (the one that always has the client) and the base behaviour
kept in WatchedSubprocess.
karenbraganz pushed a commit to karenbraganz/airflow that referenced this pull request Jan 13, 2025
Follow-up of apache#45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`
HariGS-DB pushed a commit to HariGS-DB/airflow that referenced this pull request Jan 16, 2025
When I added the DagFileProcessorProcess I did some naughty things with
subclassing and lying to the type checker, and this is now stopping us easily
adding DAG bundles (because as it is structured right now we would have to
change both parsing and execution at the same time, or make the type checker
_even more unhappy_.)

This more correctly separates the two classes -- essentially anything that
used `self.client` couldn't have been called from a DagFileProcessorProcess
(as client was always None for those instances).

This PR fixes it by adding a new `ActivitySubprocess` which is the type used
at Execution time (the one that always has the client) and the base behaviour
kept in WatchedSubprocess.
HariGS-DB pushed a commit to HariGS-DB/airflow that referenced this pull request Jan 16, 2025
Follow-up of apache#45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`
dauinh pushed a commit to dauinh/airflow that referenced this pull request Jan 24, 2025
When I added the DagFileProcessorProcess I did some naughty things with
subclassing and lying to the type checker, and this is now stopping us easily
adding DAG bundles (because as it is structured right now we would have to
change both parsing and execution at the same time, or make the type checker
_even more unhappy_.)

This more correctly separates the two classes -- essentially anything that
used `self.client` couldn't have been called from a DagFileProcessorProcess
(as client was always None for those instances).

This PR fixes it by adding a new `ActivitySubprocess` which is the type used
at Execution time (the one that always has the client) and the base behaviour
kept in WatchedSubprocess.
dauinh pushed a commit to dauinh/airflow that referenced this pull request Jan 24, 2025
Follow-up of apache#45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
When I added the DagFileProcessorProcess I did some naughty things with
subclassing and lying to the type checker, and this is now stopping us easily
adding DAG bundles (because as it is structured right now we would have to
change both parsing and execution at the same time, or make the type checker
_even more unhappy_.)

This more correctly separates the two classes -- essentially anything that
used `self.client` couldn't have been called from a DagFileProcessorProcess
(as client was always None for those instances).

This PR fixes it by adding a new `ActivitySubprocess` which is the type used
at Execution time (the one that always has the client) and the base behaviour
kept in WatchedSubprocess.
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
Follow-up of apache#45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request May 28, 2025
Follow-up of apache/airflow#45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`

GitOrigin-RevId: 168f76562e617cd8e8a41ec684bfda09db6b5705
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Sep 23, 2025
Follow-up of apache/airflow#45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`

GitOrigin-RevId: 168f76562e617cd8e8a41ec684bfda09db6b5705
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Oct 21, 2025
Follow-up of apache/airflow#45570 to add some Typehints so my editor (PyCharm) can find `_on_child_started`.

Added/Updated docstrings since `WatchedSubprocess` is now a Base class.

and removed redundant exception handling for `init_log_file`

GitOrigin-RevId: 168f76562e617cd8e8a41ec684bfda09db6b5705
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants