From 4c2502bac53c18d73a59296b762d4b4a81ac1a9d Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 20 Jun 2025 09:44:42 +0530 Subject: [PATCH 01/15] enhance cleanup logic to archive dependent tables first --- airflow-core/src/airflow/utils/db_cleanup.py | 133 ++++++++++++++++--- 1 file changed, 113 insertions(+), 20 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 02b0307523251..f47e39c253ebd 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -26,6 +26,7 @@ import csv import logging import os +from collections import defaultdict, deque from contextlib import contextmanager from dataclasses import dataclass from typing import TYPE_CHECKING, Any @@ -127,7 +128,6 @@ def readable_config(self): _TableConfig(table_name="celery_tasksetmeta", recency_column_name="date_done"), _TableConfig(table_name="trigger", recency_column_name="created_date"), _TableConfig(table_name="dag_version", recency_column_name="created_at"), - _TableConfig(table_name="deadline", recency_column_name="deadline_time"), ] # We need to have `fallback="database"` because this is executed at top level code and provider configuration @@ -169,6 +169,43 @@ def _dump_table_to_file(*, target_table: str, file_path: str, export_format: str raise AirflowException(f"Export format {export_format} is not supported.") +def get_all_dependent_tables(root_table: str, session: Session) -> list[dict]: + inspector = inspect(session.get_bind()) + config_table_names = set(config_dict.keys()) + visited = set() + dependent_fks = [] + + def visit(table_name: str): + if table_name in visited: + return + visited.add(table_name) + + for other_table in inspector.get_table_names(): + if other_table not in config_table_names: + continue + + for fk in inspector.get_foreign_keys(other_table): + if ( + fk.get("referred_table") == table_name + and fk.get("constrained_columns") + and fk.get("referred_columns") + ): + dependent_fks.append( + { + "table_name": other_table, + "fk_column": fk["constrained_columns"][0], + "referred_table": table_name, + "referred_column": fk["referred_columns"][0], + } + ) + # Recurse into the next level + visit(other_table) + + visit(root_table) + print(f"dependent_fks are {dependent_fks}") + return dependent_fks + + def _do_delete( *, query: Query, orm_model: Base, skip_archive: bool, session: Session, batch_size: int | None ) -> None: @@ -313,6 +350,33 @@ def _build_query( return query +def topologically_sort_tables(table_names, session: Session) -> list[str]: + inspector = inspect(session.get_bind()) + graph = defaultdict(set) + reverse_graph = defaultdict(set) + + # Build graph of FK relationships + for table_name in table_names: + for fk in inspector.get_foreign_keys(table_name): + referred = fk.get("referred_table") + if referred in table_names: + graph[table_name].add(referred) + reverse_graph[referred].add(table_name) + + no_deps = deque([t for t in table_names if not graph[t]]) + sorted_order = [] + + while no_deps: + table_name = no_deps.popleft() + sorted_order.append(table_name) + for dependent in reverse_graph[table_name]: + graph[dependent].remove(table_name) + if not graph[dependent]: + no_deps.append(dependent) + + return list(reversed(sorted_order)) + + def _cleanup_table( *, orm_model, @@ -331,27 +395,56 @@ def _cleanup_table( print() if dry_run: print(f"Performing dry run for table {orm_model.name}") - query = _build_query( - orm_model=orm_model, - recency_column=recency_column, - keep_last=keep_last, - keep_last_filters=keep_last_filters, - keep_last_group_by=keep_last_group_by, - clean_before_timestamp=clean_before_timestamp, - session=session, - ) - logger.debug("old rows query:\n%s", query.selectable.compile()) - print(f"Checking table {orm_model.name}") - num_rows = _check_for_rows(query=query, print_rows=False) - - if num_rows and not dry_run: - _do_delete( - query=query, - orm_model=orm_model, - skip_archive=skip_archive, + + def _cleanup_single_table(model, recency_col, keep_last_args=None): + print(f"Checking table {model.name}") + query = _build_query( + orm_model=model, + recency_column=recency_col, + keep_last=keep_last_args.get("keep_last") if keep_last_args else None, + keep_last_filters=keep_last_args.get("keep_last_filters") if keep_last_args else None, + keep_last_group_by=keep_last_args.get("keep_last_group_by") if keep_last_args else None, + clean_before_timestamp=clean_before_timestamp, session=session, - batch_size=batch_size, ) + logger.debug("old rows query:\n%s", query.selectable.compile()) + num_rows = _check_for_rows(query=query, print_rows=False) + + if num_rows and not dry_run: + _do_delete( + query=query, + orm_model=model, + skip_archive=skip_archive, + session=session, + batch_size=batch_size, + ) + + # Get all recursive dependencies including children-of-children + dependent_fks = get_all_dependent_tables(orm_model.name, session) + + # Extract table names and topologically sort deepest first + dep_table_names = [fk["table_name"] for fk in dependent_fks] + sorted_tables = topologically_sort_tables(dep_table_names, session) + print(f"sorted_tables are {sorted_tables}") + + # Cleanup all dependent tables first + for table_name in sorted_tables: + metadata = reflect_tables([table_name], session) + model = metadata.tables[table_name] + recency_col_name = config_dict[model.name].recency_column_name + recency_col = column(recency_col_name) + _cleanup_single_table(model, recency_col) + + # Cleanup original table + _cleanup_single_table( + orm_model, + recency_column, + keep_last_args={ + "keep_last": keep_last, + "keep_last_filters": keep_last_filters, + "keep_last_group_by": keep_last_group_by, + }, + ) session.commit() From 2012c88de15585e7f8ce5611066de004964899a9 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 20 Jun 2025 16:37:12 +0530 Subject: [PATCH 02/15] adding flag to just archive fk tables and let cascade take care of deletion --- airflow-core/src/airflow/utils/db_cleanup.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index f47e39c253ebd..c6f26f0a2e46e 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -207,7 +207,13 @@ def visit(table_name: str): def _do_delete( - *, query: Query, orm_model: Base, skip_archive: bool, session: Session, batch_size: int | None + *, + query: Query, + orm_model: Base, + skip_archive: bool, + session: Session, + batch_size: int | None, + skip_delete: bool = False, ) -> None: import itertools import re @@ -268,9 +274,13 @@ def _do_delete( delete = source_table.delete().where( and_(col == target_table.c[col.name] for col in source_table.primary_key.columns) ) + if skip_delete: + print(f"Skipping deletion for FK table {orm_model.name}; relying on ON DELETE CASCADE.") + return logger.debug("delete statement:\n%s", delete.compile()) session.execute(delete) session.commit() + except BaseException as e: raise e finally: @@ -396,7 +406,7 @@ def _cleanup_table( if dry_run: print(f"Performing dry run for table {orm_model.name}") - def _cleanup_single_table(model, recency_col, keep_last_args=None): + def _cleanup_single_table(model, recency_col, keep_last_args=None, skip_delete=False): print(f"Checking table {model.name}") query = _build_query( orm_model=model, @@ -417,6 +427,7 @@ def _cleanup_single_table(model, recency_col, keep_last_args=None): skip_archive=skip_archive, session=session, batch_size=batch_size, + skip_delete=skip_delete, ) # Get all recursive dependencies including children-of-children @@ -433,7 +444,7 @@ def _cleanup_single_table(model, recency_col, keep_last_args=None): model = metadata.tables[table_name] recency_col_name = config_dict[model.name].recency_column_name recency_col = column(recency_col_name) - _cleanup_single_table(model, recency_col) + _cleanup_single_table(model, recency_col, skip_delete=True) # Cleanup original table _cleanup_single_table( From 0ff24d74b52eaa51426e1288c016195bf9714ed7 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 23 Jun 2025 10:52:48 +0530 Subject: [PATCH 03/15] add test --- .../tests/unit/utils/test_db_cleanup.py | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py index 00c1cf4efd442..e48f730085c5f 100644 --- a/airflow-core/tests/unit/utils/test_db_cleanup.py +++ b/airflow-core/tests/unit/utils/test_db_cleanup.py @@ -26,7 +26,7 @@ import pendulum import pytest -from sqlalchemy import text +from sqlalchemy import inspect, text from sqlalchemy.exc import OperationalError, SQLAlchemyError from sqlalchemy.ext.declarative import DeclarativeMeta @@ -46,6 +46,7 @@ config_dict, drop_archived_tables, export_archived_records, + get_all_dependent_tables, run_cleanup, ) from airflow.utils.session import create_session @@ -303,6 +304,58 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, r else: raise Exception("unexpected") + @pytest.mark.parametrize( + "table_name, expected_deps, expected_archived", + [ + ( + "dag_run", + {"task_instance", "xcom", "task_reschedule", "task_instance_history"}, + {"dag_run", "task_instance"}, # Only these are populated + ), + ], + ) + def test_fk_detection_and_archival_integration(self, table_name, expected_deps, expected_archived): + """ + Integration test that verifies: + + 1. All expected foreign key dependent tables are detected for a given table (e.g., dag_run). + 2. The cleanup logic correctly archives only those tables that actually contain data. + 3. Archive tables are created only for populated tables, and not for empty FK dependencies. + + Steps: + - Use `create_tis()` to populate dag_run and task_instance tables. + - Detect FK dependencies for `table_name` and assert all expected ones are found. + - Run `_cleanup_table()` with a future timestamp to ensure all rows are archived. + - Inspect database to check which archive tables were actually created. + - Assert that only tables with data were archived. + """ + base_date = pendulum.datetime(2022, 1, 1, tz="UTC") + + num_tis = 5 + create_tis(base_date=base_date, num_tis=num_tis, run_type=DagRunType.MANUAL) + + with create_session() as session: + fk_deps = get_all_dependent_tables(table_name, session) + dep_table_names = {fk["table_name"] for fk in fk_deps} + assert expected_deps <= dep_table_names, f"Missing FK tables: {expected_deps - dep_table_names}" + + clean_before_date = base_date.add(days=10) + _cleanup_table( + **config_dict[table_name].__dict__, + clean_before_timestamp=clean_before_date, + dry_run=False, + session=session, + table_names=[table_name], + ) + + inspector = inspect(session.bind) + archive_tables = set(t for t in inspector.get_table_names() if t.startswith(ARCHIVE_TABLE_PREFIX)) + actual_archived = {t.split("__", 1)[-1].split("__")[0] for t in archive_tables} + + assert expected_archived <= actual_archived, ( + f"Expected archive tables not found: {expected_archived - actual_archived}" + ) + @pytest.mark.parametrize( "skip_archive, expected_archives", [pytest.param(True, 1, id="skip_archive"), pytest.param(False, 2, id="do_archive")], From 1f5f48114ff1b3cc7dfa0e261b934d1ddba2bec8 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 23 Jun 2025 17:16:39 +0530 Subject: [PATCH 04/15] Adding hardcoded FK details for tables --- airflow-core/src/airflow/utils/db_cleanup.py | 124 ++++++------------ .../tests/unit/utils/test_db_cleanup.py | 11 +- 2 files changed, 44 insertions(+), 91 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index c6f26f0a2e46e..975f2adab78a4 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -26,7 +26,6 @@ import csv import logging import os -from collections import defaultdict, deque from contextlib import contextmanager from dataclasses import dataclass from typing import TYPE_CHECKING, Any @@ -74,6 +73,7 @@ class _TableConfig: in the table. to ignore certain records even if they are the latest in the table, you can supply additional filters here (e.g. externally triggered dag runs) :param keep_last_group_by: if keeping the last record, can keep the last record for each group + :param dependent_tables: list of tables which have FK relationship with this table """ table_name: str @@ -82,6 +82,7 @@ class _TableConfig: keep_last: bool = False keep_last_filters: Any | None = None keep_last_group_by: Any | None = None + dependent_tables: list[str] | None = None def __post_init__(self): self.recency_column = column(self.recency_column_name) @@ -105,7 +106,18 @@ def readable_config(self): config_list: list[_TableConfig] = [ _TableConfig(table_name="job", recency_column_name="latest_heartbeat"), - _TableConfig(table_name="dag", recency_column_name="last_parsed_time"), + _TableConfig( + table_name="dag", + recency_column_name="last_parsed_time", + dependent_tables=[ + "task_reschedule", + "task_instance_history", + "xcom", + "task_instance", + "dag_run", + "dag_version", + ], + ), _TableConfig( table_name="dag_run", recency_column_name="start_date", @@ -113,12 +125,17 @@ def readable_config(self): keep_last=True, keep_last_filters=[column("run_type") != DagRunType.MANUAL], keep_last_group_by=["dag_id"], + dependent_tables=["xcom", "task_reschedule", "task_instance_history", "task_instance"], ), _TableConfig(table_name="asset_event", recency_column_name="timestamp"), _TableConfig(table_name="import_error", recency_column_name="timestamp"), _TableConfig(table_name="log", recency_column_name="dttm"), _TableConfig(table_name="sla_miss", recency_column_name="timestamp"), - _TableConfig(table_name="task_instance", recency_column_name="start_date"), + _TableConfig( + table_name="task_instance", + recency_column_name="start_date", + dependent_tables=["task_instance_history", "xcom", "task_reschedule"], + ), _TableConfig(table_name="task_instance_history", recency_column_name="start_date"), _TableConfig(table_name="task_reschedule", recency_column_name="start_date"), _TableConfig(table_name="xcom", recency_column_name="timestamp"), @@ -126,8 +143,16 @@ def readable_config(self): _TableConfig(table_name="callback_request", recency_column_name="created_at"), _TableConfig(table_name="celery_taskmeta", recency_column_name="date_done"), _TableConfig(table_name="celery_tasksetmeta", recency_column_name="date_done"), - _TableConfig(table_name="trigger", recency_column_name="created_date"), - _TableConfig(table_name="dag_version", recency_column_name="created_at"), + _TableConfig( + table_name="trigger", + recency_column_name="created_date", + dependent_tables=["task_reschedule", "xcom", "task_instance_history", "task_instance"], + ), + _TableConfig( + table_name="dag_version", + recency_column_name="created_at", + dependent_tables=["task_instance_history", "task_reschedule", "xcom", "task_instance", "dag_run"], + ), ] # We need to have `fallback="database"` because this is executed at top level code and provider configuration @@ -169,43 +194,6 @@ def _dump_table_to_file(*, target_table: str, file_path: str, export_format: str raise AirflowException(f"Export format {export_format} is not supported.") -def get_all_dependent_tables(root_table: str, session: Session) -> list[dict]: - inspector = inspect(session.get_bind()) - config_table_names = set(config_dict.keys()) - visited = set() - dependent_fks = [] - - def visit(table_name: str): - if table_name in visited: - return - visited.add(table_name) - - for other_table in inspector.get_table_names(): - if other_table not in config_table_names: - continue - - for fk in inspector.get_foreign_keys(other_table): - if ( - fk.get("referred_table") == table_name - and fk.get("constrained_columns") - and fk.get("referred_columns") - ): - dependent_fks.append( - { - "table_name": other_table, - "fk_column": fk["constrained_columns"][0], - "referred_table": table_name, - "referred_column": fk["referred_columns"][0], - } - ) - # Recurse into the next level - visit(other_table) - - visit(root_table) - print(f"dependent_fks are {dependent_fks}") - return dependent_fks - - def _do_delete( *, query: Query, @@ -360,33 +348,6 @@ def _build_query( return query -def topologically_sort_tables(table_names, session: Session) -> list[str]: - inspector = inspect(session.get_bind()) - graph = defaultdict(set) - reverse_graph = defaultdict(set) - - # Build graph of FK relationships - for table_name in table_names: - for fk in inspector.get_foreign_keys(table_name): - referred = fk.get("referred_table") - if referred in table_names: - graph[table_name].add(referred) - reverse_graph[referred].add(table_name) - - no_deps = deque([t for t in table_names if not graph[t]]) - sorted_order = [] - - while no_deps: - table_name = no_deps.popleft() - sorted_order.append(table_name) - for dependent in reverse_graph[table_name]: - graph[dependent].remove(table_name) - if not graph[dependent]: - no_deps.append(dependent) - - return list(reversed(sorted_order)) - - def _cleanup_table( *, orm_model, @@ -430,21 +391,16 @@ def _cleanup_single_table(model, recency_col, keep_last_args=None, skip_delete=F skip_delete=skip_delete, ) - # Get all recursive dependencies including children-of-children - dependent_fks = get_all_dependent_tables(orm_model.name, session) - - # Extract table names and topologically sort deepest first - dep_table_names = [fk["table_name"] for fk in dependent_fks] - sorted_tables = topologically_sort_tables(dep_table_names, session) - print(f"sorted_tables are {sorted_tables}") - - # Cleanup all dependent tables first - for table_name in sorted_tables: - metadata = reflect_tables([table_name], session) - model = metadata.tables[table_name] - recency_col_name = config_dict[model.name].recency_column_name - recency_col = column(recency_col_name) - _cleanup_single_table(model, recency_col, skip_delete=True) + dep_table_names = config_dict[orm_model.name].dependent_tables + if dep_table_names is not None: + # Cleanup all dependent tables first + print(f"archiving dependent tables {dep_table_names}") + for table_name in dep_table_names: + metadata = reflect_tables([table_name], session) + model = metadata.tables[table_name] + recency_col_name = config_dict[model.name].recency_column_name + recency_col = column(recency_col_name) + _cleanup_single_table(model, recency_col, skip_delete=True) # Cleanup original table _cleanup_single_table( diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py index e48f730085c5f..6b008aaa02167 100644 --- a/airflow-core/tests/unit/utils/test_db_cleanup.py +++ b/airflow-core/tests/unit/utils/test_db_cleanup.py @@ -46,7 +46,6 @@ config_dict, drop_archived_tables, export_archived_records, - get_all_dependent_tables, run_cleanup, ) from airflow.utils.session import create_session @@ -305,16 +304,15 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, r raise Exception("unexpected") @pytest.mark.parametrize( - "table_name, expected_deps, expected_archived", + "table_name, expected_archived", [ ( "dag_run", - {"task_instance", "xcom", "task_reschedule", "task_instance_history"}, {"dag_run", "task_instance"}, # Only these are populated ), ], ) - def test_fk_detection_and_archival_integration(self, table_name, expected_deps, expected_archived): + def test_archival_integration(self, table_name, expected_archived): """ Integration test that verifies: @@ -335,9 +333,8 @@ def test_fk_detection_and_archival_integration(self, table_name, expected_deps, create_tis(base_date=base_date, num_tis=num_tis, run_type=DagRunType.MANUAL) with create_session() as session: - fk_deps = get_all_dependent_tables(table_name, session) - dep_table_names = {fk["table_name"] for fk in fk_deps} - assert expected_deps <= dep_table_names, f"Missing FK tables: {expected_deps - dep_table_names}" + # dep_table_names = config_dict[table_name.name].dependent_tables + # assert expected_deps <= dep_table_names, f"Missing FK tables: {expected_deps - dep_table_names}" clean_before_date = base_date.add(days=10) _cleanup_table( From 0b37b80f01f2d9fcfa9683e42765ee85c75164d6 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 23 Jun 2025 17:45:53 +0530 Subject: [PATCH 05/15] fix table removal by typo --- airflow-core/src/airflow/utils/db_cleanup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 975f2adab78a4..a7e9bdabf847d 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -153,6 +153,7 @@ def readable_config(self): recency_column_name="created_at", dependent_tables=["task_instance_history", "task_reschedule", "xcom", "task_instance", "dag_run"], ), + _TableConfig(table_name="deadline", recency_column_name="deadline_time"), ] # We need to have `fallback="database"` because this is executed at top level code and provider configuration From 4d867fa411d9acd77eeeb634523691a990bbbc3e Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 23 Jun 2025 22:22:04 +0530 Subject: [PATCH 06/15] remove task rechedule from archival --- airflow-core/src/airflow/utils/db_cleanup.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index a7e9bdabf847d..34b6aef4472f8 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -82,6 +82,9 @@ class _TableConfig: keep_last: bool = False keep_last_filters: Any | None = None keep_last_group_by: Any | None = None + # We explicitly list these tables instead of detecting foreign keys automatically, + # because the relationships are unlikely to change and the number of tables is small. + # Relying on automation here would increase complexity and reduce maintainability. dependent_tables: list[str] | None = None def __post_init__(self): @@ -110,7 +113,6 @@ def readable_config(self): table_name="dag", recency_column_name="last_parsed_time", dependent_tables=[ - "task_reschedule", "task_instance_history", "xcom", "task_instance", @@ -125,7 +127,7 @@ def readable_config(self): keep_last=True, keep_last_filters=[column("run_type") != DagRunType.MANUAL], keep_last_group_by=["dag_id"], - dependent_tables=["xcom", "task_reschedule", "task_instance_history", "task_instance"], + dependent_tables=["xcom", "task_instance_history", "task_instance"], ), _TableConfig(table_name="asset_event", recency_column_name="timestamp"), _TableConfig(table_name="import_error", recency_column_name="timestamp"), @@ -134,7 +136,7 @@ def readable_config(self): _TableConfig( table_name="task_instance", recency_column_name="start_date", - dependent_tables=["task_instance_history", "xcom", "task_reschedule"], + dependent_tables=["task_instance_history", "xcom"], ), _TableConfig(table_name="task_instance_history", recency_column_name="start_date"), _TableConfig(table_name="task_reschedule", recency_column_name="start_date"), @@ -146,12 +148,12 @@ def readable_config(self): _TableConfig( table_name="trigger", recency_column_name="created_date", - dependent_tables=["task_reschedule", "xcom", "task_instance_history", "task_instance"], + dependent_tables=["xcom", "task_instance_history", "task_instance"], ), _TableConfig( table_name="dag_version", recency_column_name="created_at", - dependent_tables=["task_instance_history", "task_reschedule", "xcom", "task_instance", "dag_run"], + dependent_tables=["task_instance_history", "xcom", "task_instance", "dag_run"], ), _TableConfig(table_name="deadline", recency_column_name="deadline_time"), ] From ad9c1a3590e6f3f17861b75e262346321da7a350 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 23 Jun 2025 23:55:43 +0530 Subject: [PATCH 07/15] add only direct dependency and let code find all indirects --- airflow-core/src/airflow/utils/db_cleanup.py | 29 +++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 34b6aef4472f8..c1c66943cabb3 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -112,13 +112,7 @@ def readable_config(self): _TableConfig( table_name="dag", recency_column_name="last_parsed_time", - dependent_tables=[ - "task_instance_history", - "xcom", - "task_instance", - "dag_run", - "dag_version", - ], + dependent_tables=["dag_version"], ), _TableConfig( table_name="dag_run", @@ -127,7 +121,7 @@ def readable_config(self): keep_last=True, keep_last_filters=[column("run_type") != DagRunType.MANUAL], keep_last_group_by=["dag_id"], - dependent_tables=["xcom", "task_instance_history", "task_instance"], + dependent_tables=["task_instance"], ), _TableConfig(table_name="asset_event", recency_column_name="timestamp"), _TableConfig(table_name="import_error", recency_column_name="timestamp"), @@ -148,12 +142,12 @@ def readable_config(self): _TableConfig( table_name="trigger", recency_column_name="created_date", - dependent_tables=["xcom", "task_instance_history", "task_instance"], + dependent_tables=["task_instance"], ), _TableConfig( table_name="dag_version", recency_column_name="created_at", - dependent_tables=["task_instance_history", "xcom", "task_instance", "dag_run"], + dependent_tables=["task_instance", "dag_run"], ), _TableConfig(table_name="deadline", recency_column_name="deadline_time"), ] @@ -394,9 +388,18 @@ def _cleanup_single_table(model, recency_col, keep_last_args=None, skip_delete=F skip_delete=skip_delete, ) - dep_table_names = config_dict[orm_model.name].dependent_tables - if dep_table_names is not None: - # Cleanup all dependent tables first + def get_recursive_deps(table_name: str, seen: set[str]) -> list[str]: + deps = config_dict[table_name].dependent_tables or [] + all_deps = [] + for dep in deps: + if dep not in seen: + seen.add(dep) + all_deps.append(dep) + all_deps.extend(get_recursive_deps(dep, seen)) + return all_deps + + dep_table_names = get_recursive_deps(orm_model.name, seen=set()) + if dep_table_names: print(f"archiving dependent tables {dep_table_names}") for table_name in dep_table_names: metadata = reflect_tables([table_name], session) From 2325921c4ddaf94b64c458d791a75af32734349a Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 24 Jun 2025 22:23:17 +0530 Subject: [PATCH 08/15] update logic to find dependent table sin _effective_table_names method --- airflow-core/src/airflow/utils/db_cleanup.py | 115 ++++++++---------- .../tests/unit/utils/test_db_cleanup.py | 39 +++--- 2 files changed, 71 insertions(+), 83 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index c1c66943cabb3..8481e8f3fb3a5 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -358,66 +358,34 @@ def _cleanup_table( skip_archive: bool = False, session: Session, batch_size: int | None = None, + skip_delete: bool = False, **kwargs, ) -> None: print() if dry_run: print(f"Performing dry run for table {orm_model.name}") - - def _cleanup_single_table(model, recency_col, keep_last_args=None, skip_delete=False): - print(f"Checking table {model.name}") - query = _build_query( - orm_model=model, - recency_column=recency_col, - keep_last=keep_last_args.get("keep_last") if keep_last_args else None, - keep_last_filters=keep_last_args.get("keep_last_filters") if keep_last_args else None, - keep_last_group_by=keep_last_args.get("keep_last_group_by") if keep_last_args else None, - clean_before_timestamp=clean_before_timestamp, + query = _build_query( + orm_model=orm_model, + recency_column=recency_column, + keep_last=keep_last, + keep_last_filters=keep_last_filters, + keep_last_group_by=keep_last_group_by, + clean_before_timestamp=clean_before_timestamp, + session=session, + ) + logger.debug("old rows query:\n%s", query.selectable.compile()) + print(f"Checking table {orm_model.name}") + num_rows = _check_for_rows(query=query, print_rows=False) + + if num_rows and not dry_run: + _do_delete( + query=query, + orm_model=orm_model, + skip_archive=skip_archive, session=session, + batch_size=batch_size, + skip_delete=skip_delete, ) - logger.debug("old rows query:\n%s", query.selectable.compile()) - num_rows = _check_for_rows(query=query, print_rows=False) - - if num_rows and not dry_run: - _do_delete( - query=query, - orm_model=model, - skip_archive=skip_archive, - session=session, - batch_size=batch_size, - skip_delete=skip_delete, - ) - - def get_recursive_deps(table_name: str, seen: set[str]) -> list[str]: - deps = config_dict[table_name].dependent_tables or [] - all_deps = [] - for dep in deps: - if dep not in seen: - seen.add(dep) - all_deps.append(dep) - all_deps.extend(get_recursive_deps(dep, seen)) - return all_deps - - dep_table_names = get_recursive_deps(orm_model.name, seen=set()) - if dep_table_names: - print(f"archiving dependent tables {dep_table_names}") - for table_name in dep_table_names: - metadata = reflect_tables([table_name], session) - model = metadata.tables[table_name] - recency_col_name = config_dict[model.name].recency_column_name - recency_col = column(recency_col_name) - _cleanup_single_table(model, recency_col, skip_delete=True) - - # Cleanup original table - _cleanup_single_table( - orm_model, - recency_column, - keep_last_args={ - "keep_last": keep_last, - "keep_last_filters": keep_last_filters, - "keep_last_group_by": keep_last_group_by, - }, - ) session.commit() @@ -480,17 +448,37 @@ def _suppress_with_logging(table: str, session: Session): session.rollback() -def _effective_table_names(*, table_names: list[str] | None) -> tuple[set[str], dict[str, _TableConfig]]: +def _effective_table_names(*, table_names: list[str] | None) -> tuple[list[str], dict[str, _TableConfig]]: desired_table_names = set(table_names or config_dict) - effective_config_dict = {k: v for k, v in config_dict.items() if k in desired_table_names} - effective_table_names = set(effective_config_dict) - if desired_table_names != effective_table_names: - outliers = desired_table_names - effective_table_names + + visited: set[str] = set() + effective_table_names: list[str] = [] + + def collect_deps(table: str): + if table in visited: + return + visited.add(table) + config = config_dict.get(table) + if config: + for dep in config.dependent_tables or []: + collect_deps(dep) + effective_table_names.append(table) + + for table_name in desired_table_names: + collect_deps(table_name) + + effective_config_dict = {k: v for k, v in config_dict.items() if k in effective_table_names} + + outliers = desired_table_names - set(effective_config_dict) + if outliers: logger.warning( - "The following table(s) are not valid choices and will be skipped: %s", sorted(outliers) + "The following table(s) are not valid choices and will be skipped: %s", + sorted(outliers), ) - if not effective_table_names: + + if not effective_config_dict: raise SystemExit("No tables selected for db cleanup. Please choose valid table names.") + return effective_table_names, effective_config_dict @@ -546,6 +534,8 @@ def run_cleanup( :param session: Session representing connection to the metadata database. """ clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp) + + # Get all tables to clean (root + dependents) effective_table_names, effective_config_dict = _effective_table_names(table_names=table_names) if dry_run: print("Performing dry run for db cleanup.") @@ -557,7 +547,9 @@ def run_cleanup( if not dry_run and confirm: _confirm_delete(date=clean_before_timestamp, tables=sorted(effective_table_names)) existing_tables = reflect_tables(tables=None, session=session).tables - for table_name, table_config in effective_config_dict.items(): + + for table_name in effective_table_names: + table_config = effective_config_dict[table_name] if table_name in existing_tables: with _suppress_with_logging(table_name, session): _cleanup_table( @@ -568,6 +560,7 @@ def run_cleanup( skip_archive=skip_archive, session=session, batch_size=batch_size, + skip_delete=(table_names is not None and table_name not in table_names), ) session.commit() else: diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py index 6b008aaa02167..89cfe892f3817 100644 --- a/airflow-core/tests/unit/utils/test_db_cleanup.py +++ b/airflow-core/tests/unit/utils/test_db_cleanup.py @@ -312,41 +312,36 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, r ), ], ) - def test_archival_integration(self, table_name, expected_archived): + def test_run_cleanup_archival_integration(self, table_name, expected_archived): """ Integration test that verifies: - - 1. All expected foreign key dependent tables are detected for a given table (e.g., dag_run). - 2. The cleanup logic correctly archives only those tables that actually contain data. - 3. Archive tables are created only for populated tables, and not for empty FK dependencies. - - Steps: - - Use `create_tis()` to populate dag_run and task_instance tables. - - Detect FK dependencies for `table_name` and assert all expected ones are found. - - Run `_cleanup_table()` with a future timestamp to ensure all rows are archived. - - Inspect database to check which archive tables were actually created. - - Assert that only tables with data were archived. + 1. Recursive FK-dependent tables are resolved via _effective_table_names(). + 2. run_cleanup() archives only tables with data. + 3. Archive tables are not created for empty dependent tables. """ base_date = pendulum.datetime(2022, 1, 1, tz="UTC") - num_tis = 5 - create_tis(base_date=base_date, num_tis=num_tis, run_type=DagRunType.MANUAL) - with create_session() as session: - # dep_table_names = config_dict[table_name.name].dependent_tables - # assert expected_deps <= dep_table_names, f"Missing FK tables: {expected_deps - dep_table_names}" + # Create test data for DAG Run and TIs + if table_name in {"dag_run", "task_instance"}: + create_tis(base_date=base_date, num_tis=num_tis, run_type=DagRunType.MANUAL) - clean_before_date = base_date.add(days=10) - _cleanup_table( - **config_dict[table_name].__dict__, + clean_before_date = base_date.add(days=10) + + with create_session() as session: + run_cleanup( clean_before_timestamp=clean_before_date, + table_names=[table_name], dry_run=False, + confirm=False, session=session, - table_names=[table_name], ) + # Inspect archive tables created inspector = inspect(session.bind) - archive_tables = set(t for t in inspector.get_table_names() if t.startswith(ARCHIVE_TABLE_PREFIX)) + archive_tables = { + name for name in inspector.get_table_names() if name.startswith(ARCHIVE_TABLE_PREFIX) + } actual_archived = {t.split("__", 1)[-1].split("__")[0] for t in archive_tables} assert expected_archived <= actual_archived, ( From 5bfdb36bd1992f4cc46429ac441d3f7d849cbe23 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Tue, 24 Jun 2025 11:41:35 -0600 Subject: [PATCH 09/15] Refactor how we loop over the tables --- airflow-core/src/airflow/utils/db_cleanup.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 8481e8f3fb3a5..6474edca5f4d0 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -467,7 +467,7 @@ def collect_deps(table: str): for table_name in desired_table_names: collect_deps(table_name) - effective_config_dict = {k: v for k, v in config_dict.items() if k in effective_table_names} + effective_config_dict = {n: config_dict[n] for n in effective_table_names} outliers = desired_table_names - set(effective_config_dict) if outliers: @@ -548,8 +548,7 @@ def run_cleanup( _confirm_delete(date=clean_before_timestamp, tables=sorted(effective_table_names)) existing_tables = reflect_tables(tables=None, session=session).tables - for table_name in effective_table_names: - table_config = effective_config_dict[table_name] + for table_name, table_config in effective_config_dict.items(): if table_name in existing_tables: with _suppress_with_logging(table_name, session): _cleanup_table( From 827ba5ededf68bb1c8fd38aa6eba4d404a0e708f Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 24 Jun 2025 23:16:25 +0530 Subject: [PATCH 10/15] adding deadline table as dependent to dag_run --- airflow-core/src/airflow/utils/db_cleanup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 6474edca5f4d0..55a71b6aec231 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -121,7 +121,7 @@ def readable_config(self): keep_last=True, keep_last_filters=[column("run_type") != DagRunType.MANUAL], keep_last_group_by=["dag_id"], - dependent_tables=["task_instance"], + dependent_tables=["task_instance", "deadline"], ), _TableConfig(table_name="asset_event", recency_column_name="timestamp"), _TableConfig(table_name="import_error", recency_column_name="timestamp"), From 22497254202990e25ecc59037185e623c4e999e9 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Tue, 24 Jun 2025 11:49:37 -0600 Subject: [PATCH 11/15] Be loud if we have a dependent misconfigured --- airflow-core/src/airflow/utils/db_cleanup.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 55a71b6aec231..2f44083c2e2b3 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -458,10 +458,9 @@ def collect_deps(table: str): if table in visited: return visited.add(table) - config = config_dict.get(table) - if config: - for dep in config.dependent_tables or []: - collect_deps(dep) + config = config_dict[table] + for dep in config.dependent_tables or []: + collect_deps(dep) effective_table_names.append(table) for table_name in desired_table_names: From e5ae70f7021a57021ef93f91eeee9cebc8a6c4c1 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Tue, 24 Jun 2025 11:55:15 -0600 Subject: [PATCH 12/15] Fix detection of unknown tables in table list --- airflow-core/src/airflow/utils/db_cleanup.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 2f44083c2e2b3..8c64085737d22 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -451,6 +451,14 @@ def _suppress_with_logging(table: str, session: Session): def _effective_table_names(*, table_names: list[str] | None) -> tuple[list[str], dict[str, _TableConfig]]: desired_table_names = set(table_names or config_dict) + outliers = desired_table_names - set(config_dict.keys()) + if outliers: + logger.warning( + "The following table(s) are not valid choices and will be skipped: %s", + sorted(outliers), + ) + desired_table_names = desired_table_names - outliers + visited: set[str] = set() effective_table_names: list[str] = [] @@ -468,13 +476,6 @@ def collect_deps(table: str): effective_config_dict = {n: config_dict[n] for n in effective_table_names} - outliers = desired_table_names - set(effective_config_dict) - if outliers: - logger.warning( - "The following table(s) are not valid choices and will be skipped: %s", - sorted(outliers), - ) - if not effective_config_dict: raise SystemExit("No tables selected for db cleanup. Please choose valid table names.") From a490383303a6cb67d7be934dd75c3502446ec2aa Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 24 Jun 2025 23:29:50 +0530 Subject: [PATCH 13/15] adding deadline table as dependent to dag --- airflow-core/src/airflow/utils/db_cleanup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 8c64085737d22..4649f439ae56e 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -112,7 +112,7 @@ def readable_config(self): _TableConfig( table_name="dag", recency_column_name="last_parsed_time", - dependent_tables=["dag_version"], + dependent_tables=["dag_version", "deadline"], ), _TableConfig( table_name="dag_run", From df11f3b8a175574cc76f4185f92ccde44476058e Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Tue, 24 Jun 2025 12:35:18 -0600 Subject: [PATCH 14/15] Remove skip_delete --- airflow-core/src/airflow/utils/db_cleanup.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 4649f439ae56e..d0bf5755fed20 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -198,7 +198,6 @@ def _do_delete( skip_archive: bool, session: Session, batch_size: int | None, - skip_delete: bool = False, ) -> None: import itertools import re @@ -259,9 +258,6 @@ def _do_delete( delete = source_table.delete().where( and_(col == target_table.c[col.name] for col in source_table.primary_key.columns) ) - if skip_delete: - print(f"Skipping deletion for FK table {orm_model.name}; relying on ON DELETE CASCADE.") - return logger.debug("delete statement:\n%s", delete.compile()) session.execute(delete) session.commit() @@ -358,7 +354,6 @@ def _cleanup_table( skip_archive: bool = False, session: Session, batch_size: int | None = None, - skip_delete: bool = False, **kwargs, ) -> None: print() @@ -384,7 +379,6 @@ def _cleanup_table( skip_archive=skip_archive, session=session, batch_size=batch_size, - skip_delete=skip_delete, ) session.commit() @@ -559,7 +553,6 @@ def run_cleanup( skip_archive=skip_archive, session=session, batch_size=batch_size, - skip_delete=(table_names is not None and table_name not in table_names), ) session.commit() else: From e0a6ea87146ef7aa4e3d288b176ee35b5fc3b25b Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Tue, 24 Jun 2025 12:54:10 -0600 Subject: [PATCH 15/15] Update airflow-core/src/airflow/utils/db_cleanup.py --- airflow-core/src/airflow/utils/db_cleanup.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index d0bf5755fed20..96e803775f803 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -192,12 +192,7 @@ def _dump_table_to_file(*, target_table: str, file_path: str, export_format: str def _do_delete( - *, - query: Query, - orm_model: Base, - skip_archive: bool, - session: Session, - batch_size: int | None, + *, query: Query, orm_model: Base, skip_archive: bool, session: Session, batch_size: int | None ) -> None: import itertools import re