AIP-67 - Multi Team: Update Celery Executor to support multi team#60675
AIP-67 - Multi Team: Update Celery Executor to support multi team#60675dheerajturaga merged 16 commits intoapache:mainfrom
Conversation
Updating the Celery executor to read from team based config and also support multiple instances running concurrently. The latter is the largest source of changes. Much of the celery configuration (both Airflow config and Celery config) was module based. Modules are cached and shared in Python. So the majority of the changes are moving that module level config code to be function based (while trying to also maintain backwards compatibility). The way Celery tasks are sent to workers also changed as a consequence of this. Since sending tasks is parallelized with multiple processes (which do not share memory with the parent) the send task logic now re-creates a celery app in the sub processes (since the pickling and unpickling that python does to try pass state to the sub processes was not reliably creating the correct celery app objects).
dheerajturaga
left a comment
There was a problem hiding this comment.
Thanks for the enhancements! the current implementation introduces static type errors that will fail our CI pipeline. I have provided inline patches to correct the type signatures and ensure compliance with our MyPy configuration.
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
Outdated
Show resolved
Hide resolved
providers/celery/tests/integration/celery/test_celery_executor.py
Outdated
Show resolved
Hide resolved
|
@o-nikolas I have tested the general functionality of celery worker with your changes as well as tested out the CLI. Things are working as expected. However, I don't see how I can test the |
As inferred by the presence of the correct ExecutorConf methods being available.
Thanks for the thorough review @dheerajturaga! I've addressed those issues (slightly differently for one than your suggested patch). I'm currently struggling with back compat tests. It's slow/difficult because those tests to do not run successfully in breeze on my laptop. So I have to push to the PR to test each change. As far as testing with the --team flag. For this you have to have a full multi-team setup, which we don't have great documentation for yet (coming soon). The most helpful testing is actually on the backcompat side (testing with airflow 2.11 and 3.1.X) |
providers/celery/src/airflow/providers/celery/executors/celery_executor.py
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
Outdated
Show resolved
Hide resolved
dheerajturaga
left a comment
There was a problem hiding this comment.
@o-nikolas , I have run backward compatibility checks. Things work good in 2.11.0 however when I tried this with 3.1.3 I found issues. There are incomplete API contract in the ExecutorConf class between Airflow versions.
dheerajturaga
left a comment
There was a problem hiding this comment.
Hopefully the final set of changes needed to be consistent. Everything else looks good.
providers/celery/src/airflow/providers/celery/cli/definition.py
Outdated
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/cli/celery_command.py
Outdated
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/cli/celery_command.py
Outdated
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/cli/celery_command.py
Outdated
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/cli/celery_command.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com>
Request for changes has been left after fixes were applied.
dheerajturaga
left a comment
There was a problem hiding this comment.
Awesome! Thanks so much for your patience, I know there was a lot of back and forth. Current changes look good!
No worries @dheerajturaga! I appreciate the thorough review and testing :) I made sure to commit one of your suggestions to get you tagged as a co-author for your efforts! |
…ache#60675) * Update Celery Executor to support multi team Updating the Celery executor to read from team based config and also support multiple instances running concurrently. The latter is the largest source of changes. Much of the celery configuration (both Airflow config and Celery config) was module based. Modules are cached and shared in Python. So the majority of the changes are moving that module level config code to be function based (while trying to also maintain backwards compatibility). The way Celery tasks are sent to workers also changed as a consequence of this. Since sending tasks is parallelized with multiple processes (which do not share memory with the parent) the send task logic now re-creates a celery app in the sub processes (since the pickling and unpickling that python does to try pass state to the sub processes was not reliably creating the correct celery app objects). * Fixes from PR CI * Mypy sometimes makes code worse * Fallback to global conf if we're not running on 3.2+ airflow As inferred by the presence of the correct ExecutorConf methods being available. * More backcompat * Skip multi team tests if not 3.2 * 3.2 not 3.1 * Add type annotation for create_celery_app * Conditional or TYPE_CHECKING imports of ExercutorConfig * More type fixups * Use explicit version compat checks rather than trying to infer * Test back compat on celery command * fixup * Apply suggestions from code review Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com> * New exception for unit test --------- Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com>
…n test The test constructed ExecuteTask and TaskInstance via model_construct(), which bypasses Pydantic validation. Fields added or made required by apache#50825 (dag_version_id, pool_slots) and inherited from BaseDagBundleWorkload (token, dag_rel_path, bundle_info, log_path) were missing. This went unnoticed until apache#60675 changed task dispatch to run in ProcessPoolExecutor subprocesses where mock patches don't apply, causing the real execute_workload (with full schema validation) to run on the worker.
…ache#60675) * Update Celery Executor to support multi team Updating the Celery executor to read from team based config and also support multiple instances running concurrently. The latter is the largest source of changes. Much of the celery configuration (both Airflow config and Celery config) was module based. Modules are cached and shared in Python. So the majority of the changes are moving that module level config code to be function based (while trying to also maintain backwards compatibility). The way Celery tasks are sent to workers also changed as a consequence of this. Since sending tasks is parallelized with multiple processes (which do not share memory with the parent) the send task logic now re-creates a celery app in the sub processes (since the pickling and unpickling that python does to try pass state to the sub processes was not reliably creating the correct celery app objects). * Fixes from PR CI * Mypy sometimes makes code worse * Fallback to global conf if we're not running on 3.2+ airflow As inferred by the presence of the correct ExecutorConf methods being available. * More backcompat * Skip multi team tests if not 3.2 * 3.2 not 3.1 * Add type annotation for create_celery_app * Conditional or TYPE_CHECKING imports of ExercutorConfig * More type fixups * Use explicit version compat checks rather than trying to infer * Test back compat on celery command * fixup * Apply suggestions from code review Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com> * New exception for unit test --------- Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com>
…ache#60675) * Update Celery Executor to support multi team Updating the Celery executor to read from team based config and also support multiple instances running concurrently. The latter is the largest source of changes. Much of the celery configuration (both Airflow config and Celery config) was module based. Modules are cached and shared in Python. So the majority of the changes are moving that module level config code to be function based (while trying to also maintain backwards compatibility). The way Celery tasks are sent to workers also changed as a consequence of this. Since sending tasks is parallelized with multiple processes (which do not share memory with the parent) the send task logic now re-creates a celery app in the sub processes (since the pickling and unpickling that python does to try pass state to the sub processes was not reliably creating the correct celery app objects). * Fixes from PR CI * Mypy sometimes makes code worse * Fallback to global conf if we're not running on 3.2+ airflow As inferred by the presence of the correct ExecutorConf methods being available. * More backcompat * Skip multi team tests if not 3.2 * 3.2 not 3.1 * Add type annotation for create_celery_app * Conditional or TYPE_CHECKING imports of ExercutorConfig * More type fixups * Use explicit version compat checks rather than trying to infer * Test back compat on celery command * fixup * Apply suggestions from code review Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com> * New exception for unit test --------- Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com>
…ache#60675) * Update Celery Executor to support multi team Updating the Celery executor to read from team based config and also support multiple instances running concurrently. The latter is the largest source of changes. Much of the celery configuration (both Airflow config and Celery config) was module based. Modules are cached and shared in Python. So the majority of the changes are moving that module level config code to be function based (while trying to also maintain backwards compatibility). The way Celery tasks are sent to workers also changed as a consequence of this. Since sending tasks is parallelized with multiple processes (which do not share memory with the parent) the send task logic now re-creates a celery app in the sub processes (since the pickling and unpickling that python does to try pass state to the sub processes was not reliably creating the correct celery app objects). * Fixes from PR CI * Mypy sometimes makes code worse * Fallback to global conf if we're not running on 3.2+ airflow As inferred by the presence of the correct ExecutorConf methods being available. * More backcompat * Skip multi team tests if not 3.2 * 3.2 not 3.1 * Add type annotation for create_celery_app * Conditional or TYPE_CHECKING imports of ExercutorConfig * More type fixups * Use explicit version compat checks rather than trying to infer * Test back compat on celery command * fixup * Apply suggestions from code review Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com> * New exception for unit test --------- Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com>
Updating the Celery executor to read from team based config and also support multiple instances running concurrently.
The latter is the largest source of changes. Much of the celery configuration (both Airflow config and Celery config) was module based. Modules are cached and shared in Python. So the majority of the changes are moving that module level config code to be function based (while trying to also maintain backwards compatibility).
The way Celery tasks are sent to workers also changed as a consequence of this. Since sending tasks is parallelized with multiple processes (which do not share memory with the parent) the send task logic now re-creates a celery app in the sub processes (since the pickling and unpickling that python does to try pass state to the sub processes was not reliably creating the correct celery app objects).
Was generative AI tooling used to co-author this PR?
Cline
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.