-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix db clean dag version fk constraint #59525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix db clean dag version fk constraint #59525
Conversation
- Convert session.query(Log).where().count() to select(func.count()).where() - Convert session.query(...).filter().group_by() to select(...).where().group_by() - Add airflow-core/src/airflow/jobs/ to pre-commit hook Part of ongoing effort to migrate to SQLAlchemy 2.0 style. Related to deprecation of Query object in SQLAlchemy 2.0.
…cheduler_job.py - Convert session.query() to session.scalars(select()) for model queries - Convert session.query(func.count()) to session.scalar(select(func.count())) - Convert .filter() to .where() - Convert .delete() to delete().where() with execution_options - Add 'delete' import to SQLAlchemy imports - Fixed 25+ more occurrences in test_scheduler_job.py - Still ~25+ occurrences remaining (mostly count queries)
- Fixed asset1_id query (line 4468) - Convert TaskInstance join query (line 5049) - Convert DagRun order_by query (line 5096) - Progress: 28+ occurrences fixed total - Remaining: ~40+ count queries and filter_by patterns
- Fixed 7 more count query patterns in test_scheduler_job.py - Converted session.query(func.count()) to session.scalar(select(func.count())) - Progress: ~35 of 70+ occurrences now fixed
- Converted ALL remaining deprecated session.query() patterns to select() API - Fixed 70+ occurrences total in test_scheduler_job.py - Fixed 2 occurrences in scheduler_job_runner.py - Updated pre-commit config to enforce SQLAlchemy 2.0 style - Added 'delete' import for delete() operations Migration patterns applied: - session.query(Model).filter() session.scalars(select(Model).where()) - session.query(Model).first/one/all() session.scalars(select(Model)).first/one/all() - session.query(func.count()).scalar() session.scalar(select(func.count())) - session.query(Model).count() session.scalar(select(func.count()).select_from(Model)) - session.query(Model).delete() session.execute(delete(Model)) - Multiline queries with .filter().scalar() Corresponding .where() patterns All deprecated Query API usage removed from jobs module.
… active task_instances This commit fixes a foreign key constraint violation that occurs when running 'airflow db clean' command. The issue happened when: - A dag_version row is old (based on created_at) - A task_instance row is recent (based on start_date) but references the old dag_version - The cleanup tries to delete dag_version first, violating the RESTRICT FK constraint The problem was that dag_version incorrectly listed task_instance in its dependent_tables configuration. However, task_instance has a foreign key TO dag_version (task_instance.dag_version_id -> dag_version.id), making task_instance the referrer, not the dependent. Solution: - Removed task_instance from dag_version's dependent_tables list - This prevents dag_version rows from being deleted when they are still referenced by task_instance rows that don't meet deletion criteria Added test case to verify that dag_version rows are properly preserved when referenced by active task_instances. Fixes: apache#44791
|
In case there are any conflicts regarding the issue during the review, Do tell for the changes . |
henry3260
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like some unrelated changes were included by mistake. Could you please remove them? Thanks!
| ) | ||
|
|
||
| query = session.query(Log).where( | ||
| stmt = select(func.count()).select_from(Log).where( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change looks unrelated to this PR.
|
|
||
| if last_running_time is not None: | ||
| query = query.where(Log.dttm > last_running_time) | ||
| stmt = stmt.where(Log.dttm > last_running_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR contains work from other PR(#59407) mixed in.
|
Okay i will have a look on what's wrong and catch you up with the updates. |
|
the fact that you have 2 PRs with the same pr title doesn't instil confidence in your PRs anymore. @Arunodoy18 I am going to close your PRs -- Please review and test your changes with correct PR description. Using LLMs without those increase maintenance burdens and CI run time. Feel free to recreate focussed PRs following those guidelines. |
Fixes #44791
What changes were proposed in this pull request?
Fixed a foreign key constraint violation in the
airflow db cleancommand that occurred when attempting to deletedag_versionrows that are still referenced by activetask_instancerows.Why are the changes needed?
When running
airflow db cleanwith a date that causes olddag_versionrows (based oncreated_at) to be deleted while recenttask_instancerows (based onstart_date) that reference them are kept, an IntegrityError is raised due to thetask_instance_dag_version_id_fkeyRESTRICT constraint.The issue was that
dag_versionincorrectly listedtask_instancein itsdependent_tablesconfiguration, suggesting a parent-child relationship that doesn't exist. In reality,task_instancehas a foreign key TOdag_version, making it the referrer, not the dependent.How was this patch tested?
test_dag_version_not_deleted_when_task_instance_references_itthat reproduces the scenario and verifies the fixdag_versionand a recenttask_instancereferencing it, then runs cleanup to ensure no IntegrityError occursAre there any user-facing changes?
No, this is a bug fix that prevents an error. The behavior now correctly respects foreign key constraints.