-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Refactor Sqlalchemy queries to 2.0 style (Part 3) #32177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2202690
b9a67d1
4bc06cb
57c61b8
c56bafc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -92,7 +92,7 @@ def _format_airflow_moved_table_name(source_table, version, category): | |
| @provide_session | ||
| def merge_conn(conn, session: Session = NEW_SESSION): | ||
| """Add new Connection.""" | ||
| if not session.query(conn.__class__).filter_by(conn_id=conn.conn_id).first(): | ||
| if not session.scalar(select(conn.__class__).filter_by(conn_id=conn.conn_id).limit(1)): | ||
| session.add(conn) | ||
| session.commit() | ||
|
|
||
|
|
@@ -959,7 +959,9 @@ def check_conn_id_duplicates(session: Session) -> Iterable[str]: | |
|
|
||
| dups = [] | ||
| try: | ||
| dups = session.query(Connection.conn_id).group_by(Connection.conn_id).having(func.count() > 1).all() | ||
| dups = session.execute( | ||
| select(Connection.conn_id).group_by(Connection.conn_id).having(func.count() > 1) | ||
| ).all() | ||
phanikumv marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| except (exc.OperationalError, exc.ProgrammingError): | ||
| # fallback if tables hasn't been created yet | ||
| session.rollback() | ||
|
|
@@ -984,12 +986,11 @@ def check_username_duplicates(session: Session) -> Iterable[str]: | |
| for model in [User, RegisterUser]: | ||
| dups = [] | ||
| try: | ||
| dups = ( | ||
| session.query(model.username) # type: ignore[attr-defined] | ||
| dups = session.execute( | ||
| select(model.username) # type: ignore[attr-defined] | ||
| .group_by(model.username) # type: ignore[attr-defined] | ||
| .having(func.count() > 1) | ||
| .all() | ||
| ) | ||
| ).all() | ||
phanikumv marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| except (exc.OperationalError, exc.ProgrammingError): | ||
| # fallback if tables hasn't been created yet | ||
| session.rollback() | ||
|
|
@@ -1058,13 +1059,13 @@ def check_task_fail_for_duplicates(session): | |
| """ | ||
| minimal_table_obj = table(table_name, *[column(x) for x in uniqueness]) | ||
| try: | ||
| subquery = ( | ||
| session.query(minimal_table_obj, func.count().label("dupe_count")) | ||
| subquery = session.execute( | ||
| select(minimal_table_obj, func.count().label("dupe_count")) | ||
| .group_by(*[text(x) for x in uniqueness]) | ||
| .having(func.count() > text("1")) | ||
| .subquery() | ||
| ) | ||
| dupe_count = session.query(func.sum(subquery.c.dupe_count)).scalar() | ||
| dupe_count = session.scalar(select(func.sum(subquery.c.dupe_count))) | ||
| if not dupe_count: | ||
| # there are no duplicates; nothing to do. | ||
| return | ||
|
|
@@ -1101,7 +1102,7 @@ def check_conn_type_null(session: Session) -> Iterable[str]: | |
|
|
||
| n_nulls = [] | ||
| try: | ||
| n_nulls = session.query(Connection.conn_id).filter(Connection.conn_type.is_(None)).all() | ||
| n_nulls = session.scalars(select(Connection.conn_id).where(Connection.conn_type.is_(None))).all() | ||
| except (exc.OperationalError, exc.ProgrammingError, exc.InternalError): | ||
| # fallback if tables hasn't been created yet | ||
| session.rollback() | ||
|
|
@@ -1143,7 +1144,7 @@ def check_run_id_null(session: Session) -> Iterable[str]: | |
| dagrun_table.c.run_id.is_(None), | ||
| dagrun_table.c.execution_date.is_(None), | ||
| ) | ||
| invalid_dagrun_count = session.query(func.count(dagrun_table.c.id)).filter(invalid_dagrun_filter).scalar() | ||
| invalid_dagrun_count = session.scalar(select(func.count(dagrun_table.c.id)).where(invalid_dagrun_filter)) | ||
| if invalid_dagrun_count > 0: | ||
| dagrun_dangling_table_name = _format_airflow_moved_table_name(dagrun_table.name, "2.2", "dangling") | ||
| if dagrun_dangling_table_name in inspect(session.get_bind()).get_table_names(): | ||
|
|
@@ -1240,7 +1241,7 @@ def _move_dangling_data_to_new_table( | |
| pk_cols = source_table.primary_key.columns | ||
|
|
||
| delete = source_table.delete().where( | ||
| tuple_(*pk_cols).in_(session.query(*target_table.primary_key.columns).subquery()) | ||
| tuple_(*pk_cols).in_(session.select(*target_table.primary_key.columns).subquery()) | ||
| ) | ||
| else: | ||
| delete = source_table.delete().where( | ||
|
|
@@ -1262,10 +1263,11 @@ def _dangling_against_dag_run(session, source_table, dag_run): | |
| source_table.c.dag_id == dag_run.c.dag_id, | ||
| source_table.c.execution_date == dag_run.c.execution_date, | ||
| ) | ||
|
|
||
| return ( | ||
| session.query(*[c.label(c.name) for c in source_table.c]) | ||
| select(*[c.label(c.name) for c in source_table.c]) | ||
| .join(dag_run, source_to_dag_run_join_cond, isouter=True) | ||
| .filter(dag_run.c.dag_id.is_(None)) | ||
| .where(dag_run.c.dag_id.is_(None)) | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -1304,10 +1306,10 @@ def _dangling_against_task_instance(session, source_table, dag_run, task_instanc | |
| ) | ||
|
|
||
| return ( | ||
| session.query(*[c.label(c.name) for c in source_table.c]) | ||
| select(*[c.label(c.name) for c in source_table.c]) | ||
|
||
| .join(dag_run, dr_join_cond, isouter=True) | ||
| .join(task_instance, ti_join_cond, isouter=True) | ||
| .filter(or_(task_instance.c.dag_id.is_(None), dag_run.c.dag_id.is_(None))) | ||
| .where(or_(task_instance.c.dag_id.is_(None), dag_run.c.dag_id.is_(None))) | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -1331,9 +1333,9 @@ def _move_duplicate_data_to_new_table( | |
| """ | ||
| bind = session.get_bind() | ||
| dialect_name = bind.dialect.name | ||
|
|
||
| query = ( | ||
| session.query(source_table) | ||
| .with_entities(*[getattr(source_table.c, x.name).label(str(x.name)) for x in source_table.columns]) | ||
| select(*[getattr(source_table.c, x.name).label(str(x.name)) for x in source_table.columns]) | ||
| .select_from(source_table) | ||
| .join(subquery, and_(*[getattr(source_table.c, x) == getattr(subquery.c, x) for x in uniqueness])) | ||
| ) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.