-
Notifications
You must be signed in to change notification settings - Fork 16.4k
dag_processing: initialize versioned bundles for callbacks (#52040) #60734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dag_processing: initialize versioned bundles for callbacks (#52040) #60734
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. The placement of the lock is appropriate and the test coverage is solid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes an issue where GitDagBundle with versioning support fails during callback execution because versioned code bundles are not initialized. The changes ensure that versioned bundles are properly initialized before callback execution and that stale bundle versions are cleaned up periodically.
Changes:
- Initialize versioned bundles when callbacks are queued
- Wrap callback execution in
BundleVersionLockto track usage and prevent cleanup races - Add periodic cleanup of stale bundle versions in the DAG processing loop
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| airflow-core/src/airflow/dag_processing/manager.py | Added bundle initialization in _add_callback_to_queue and periodic stale bundle cleanup logic |
| airflow-core/src/airflow/dag_processing/processor.py | Wrapped callback execution in BundleVersionLock context manager |
| airflow-core/tests/unit/dag_processing/test_manager.py | Added tests for bundle initialization and stale bundle cleanup |
| airflow-core/tests/unit/dag_processing/test_processor.py | Added test for BundleVersionLock usage during callback execution |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@nailo2c thanks for the PR! Copilot has suggested a couple extra paths to test, could you take a look at adding those? |
|
Thanks for the review! Let me take a look at it. |
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some nits, tried with my use case:
cat callback.py.log | grep -i succeed
{"timestamp":"2026-01-29T07:21:32.422700Z","level":"info","event":"DAG succeeded!","logger":"task.stdout"}And it works fine, thanks for looking into this @nailo2c
|
@nailo2c , I will like to include this in 3.1.7. Can you address the reviews? |
|
Hi @ephraimbuddy, thanks for the thorough review, I appreciate it. I'll check it today :) |
Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
|
Hi all, thanks for all the reviews, they were very helpful! I've addressed all the feedback. If you notice anything else I can improve, please let me know :) |
Backport failed to create: v3-1-test. View the failure log Run details
You can attempt to backport this manually by running: cherry_picker cac9c0b v3-1-testThis should apply the commit to the v3-1-test branch and leave the commit in conflict state marking After you have resolved the conflicts, you can continue the backport process by running: cherry_picker --continueIf you don't have cherry-picker installed, see the installation guide. |
…60734) * dag_processing: initialize versioned bundles for callbacks (#52040) * Apply suggestion from @amoghrajesh Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com> * Apply suggestion from @amoghrajesh Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com> * Apply suggestion from @amoghrajesh Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com> * Add unit tests for callback handling and improve logging in Dag processing * fix CI static checks #2 --------- Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com> (cherry picked from commit cac9c0b)
Closes: #52040
Why
When using GitDagBundle with
supports_versioning=True, Airflow DAG Processor fails to execute DAG callbacks because it cannot find the materialized code underversions/<commit_hash>/.The DAG processor should honor
stale_bundle_cleanup_*to avoid accumulating versioned bundles.How
_add_callback_to_queue.BundleVersionLockto update usage and avoid cleanup races._cleanup_stale_bundle_versionsinDagFileProcessorManager._run_parsing_loopto periodically check and remove stale bundle versions.What
Before the fixes,
_execute_dag_callbacksraised errors and the callback logs did not appear in the DAG processor logs.Here is my remote dag repo: https://github.com/nailo2c/my_repo/blob/main/example_git_bundle_success_dag.py
After the fixes,
_execute_dag_callbackscompletes without errors, and callbacks continue to work after committing new versions.Both
on_success_callbackandon_failure_callbackbehave as expected.Was generative AI tooling used to co-author this PR?
GPT 5.2
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.