diff --git a/airflow/providers/common/sql/CHANGELOG.rst b/airflow/providers/common/sql/CHANGELOG.rst index 8cd190482c237..6938a7d838a7d 100644 --- a/airflow/providers/common/sql/CHANGELOG.rst +++ b/airflow/providers/common/sql/CHANGELOG.rst @@ -25,6 +25,14 @@ Changelog --------- +1.17.0 +...... + +Features +~~~~~~~~ + +* ``Connection in DB Hook is now cached to avoid multiple lookups when properties from extras have to be resolved`` + 1.16.0 ...... diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index dc66cc40bd999..33d1cdf879965 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -53,6 +53,7 @@ from pandas import DataFrame from sqlalchemy.engine import URL + from airflow.models import Connection from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.sqlparser import DatabaseInfo @@ -183,14 +184,14 @@ def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwa self._replace_statement_format: str = kwargs.get( "replace_statement_format", "REPLACE INTO {} {} VALUES ({})" ) + self._connection: Connection | None = kwargs.pop("connection", None) def get_conn_id(self) -> str: return getattr(self, self.conn_name_attr) @cached_property def placeholder(self): - conn = self.get_connection(self.get_conn_id()) - placeholder = conn.extra_dejson.get("placeholder") + placeholder = self.connection_extra.get("placeholder") if placeholder: if placeholder in SQL_PLACEHOLDERS: return placeholder @@ -203,9 +204,28 @@ def placeholder(self): ) return self._placeholder + @property + def connection(self) -> Connection: + if self._connection is None: + self._connection = self.get_connection(self.get_conn_id()) + return self._connection + + @property + def connection_extra(self) -> dict: + return self.connection.extra_dejson + + @cached_property + def connection_extra_lower(self) -> dict: + """ + ``connection.extra_dejson`` but where keys are converted to lower case. + + This is used internally for case-insensitive access of extra params. + """ + return {k.lower(): v for k, v in self.connection_extra.items()} + def get_conn(self): """Return a connection object.""" - db = self.get_connection(self.get_conn_id()) + db = self.connection return self.connector.connect(host=db.host, port=db.port, username=db.login, schema=db.schema) def get_uri(self) -> str: diff --git a/airflow/providers/common/sql/hooks/sql.pyi b/airflow/providers/common/sql/hooks/sql.pyi index 625ec1d320527..4f60f4a7bbc75 100644 --- a/airflow/providers/common/sql/hooks/sql.pyi +++ b/airflow/providers/common/sql/hooks/sql.pyi @@ -38,6 +38,7 @@ from airflow.exceptions import ( AirflowProviderDeprecationWarning as AirflowProviderDeprecationWarning, ) from airflow.hooks.base import BaseHook as BaseHook +from airflow.models import Connection as Connection from airflow.providers.openlineage.extractors import OperatorLineage as OperatorLineage from airflow.providers.openlineage.sqlparser import DatabaseInfo as DatabaseInfo from functools import cached_property as cached_property @@ -67,6 +68,12 @@ class DbApiHook(BaseHook): def get_conn_id(self) -> str: ... @cached_property def placeholder(self): ... + @property + def connection(self) -> Connection: ... + @property + def connection_extra(self) -> dict: ... + @cached_property + def connection_extra_lower(self) -> dict: ... def get_conn(self): ... def get_uri(self) -> str: ... @property diff --git a/airflow/providers/common/sql/provider.yaml b/airflow/providers/common/sql/provider.yaml index 06c43af1d8db6..f600acc1fa44b 100644 --- a/airflow/providers/common/sql/provider.yaml +++ b/airflow/providers/common/sql/provider.yaml @@ -25,6 +25,7 @@ state: ready source-date-epoch: 1723970051 # note that those versions are maintained by release manager - do not update them manually versions: + - 1.17.0 - 1.16.0 - 1.15.0 - 1.14.2 diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py b/airflow/providers/elasticsearch/hooks/elasticsearch.py index ca90400177791..31633574bfcdf 100644 --- a/airflow/providers/elasticsearch/hooks/elasticsearch.py +++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py @@ -93,12 +93,11 @@ class ElasticsearchSQLHook(DbApiHook): def __init__(self, schema: str = "http", connection: AirflowConnection | None = None, *args, **kwargs): super().__init__(*args, **kwargs) self.schema = schema - self.connection = connection + self._connection = connection def get_conn(self) -> ESConnection: """Return an elasticsearch connection object.""" - conn_id = self.get_conn_id() - conn = self.connection or self.get_connection(conn_id) + conn = self.connection conn_args = { "host": conn.host, @@ -117,8 +116,7 @@ def get_conn(self) -> ESConnection: return connect(**conn_args) def get_uri(self) -> str: - conn_id = self.get_conn_id() - conn = self.connection or self.get_connection(conn_id) + conn = self.connection login = "" if conn.login: diff --git a/airflow/providers/elasticsearch/provider.yaml b/airflow/providers/elasticsearch/provider.yaml index f0417b7e6d945..102c0ede7e02f 100644 --- a/airflow/providers/elasticsearch/provider.yaml +++ b/airflow/providers/elasticsearch/provider.yaml @@ -68,7 +68,7 @@ versions: dependencies: - apache-airflow>=2.8.0 - - apache-airflow-providers-common-sql>=1.14.1 + - apache-airflow-providers-common-sql>=1.17.0 - elasticsearch>=8.10,<9 integrations: diff --git a/airflow/providers/jdbc/hooks/jdbc.py b/airflow/providers/jdbc/hooks/jdbc.py index 81c63cbe3da16..27a438ae414cf 100644 --- a/airflow/providers/jdbc/hooks/jdbc.py +++ b/airflow/providers/jdbc/hooks/jdbc.py @@ -106,16 +106,6 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: "relabeling": {"host": "Connection URL"}, } - @property - def connection_extra_lower(self) -> dict: - """ - ``connection.extra_dejson`` but where keys are converted to lower case. - - This is used internally for case-insensitive access of jdbc params. - """ - conn = self.get_connection(self.get_conn_id()) - return {k.lower(): v for k, v in conn.extra_dejson.items()} - @property def driver_path(self) -> str | None: from airflow.configuration import conf diff --git a/airflow/providers/jdbc/provider.yaml b/airflow/providers/jdbc/provider.yaml index 96a0e0f84ff2f..2178b5dea91fa 100644 --- a/airflow/providers/jdbc/provider.yaml +++ b/airflow/providers/jdbc/provider.yaml @@ -53,7 +53,7 @@ versions: dependencies: - apache-airflow>=2.8.0 - - apache-airflow-providers-common-sql>=1.14.1 + - apache-airflow-providers-common-sql>=1.17.0 - jaydebeapi>=1.1.1 integrations: diff --git a/airflow/providers/microsoft/mssql/hooks/mssql.py b/airflow/providers/microsoft/mssql/hooks/mssql.py index 73b8beb5777b8..d45a43a188ce6 100644 --- a/airflow/providers/microsoft/mssql/hooks/mssql.py +++ b/airflow/providers/microsoft/mssql/hooks/mssql.py @@ -19,8 +19,7 @@ from __future__ import annotations -from functools import cached_property -from typing import TYPE_CHECKING, Any +from typing import Any import pymssql from methodtools import lru_cache @@ -28,9 +27,6 @@ from airflow.providers.common.sql.hooks.sql import DbApiHook, fetch_all_handler -if TYPE_CHECKING: - from airflow.models import Connection - class MsSqlHook(DbApiHook): """ @@ -59,24 +55,6 @@ def __init__( self.schema = kwargs.pop("schema", None) self._sqlalchemy_scheme = sqlalchemy_scheme - @cached_property - def connection(self) -> Connection: - """ - Get the airflow connection object. - - :return: The connection object. - """ - return self.get_connection(self.get_conn_id()) - - @property - def connection_extra_lower(self) -> dict: - """ - ``connection.extra_dejson`` but where keys are converted to lower case. - - This is used internally for case-insensitive access of mssql params. - """ - return {k.lower(): v for k, v in self.connection.extra_dejson.items()} - @property def sqlalchemy_scheme(self) -> str: """Sqlalchemy scheme either from constructor, connection extras or default.""" diff --git a/airflow/providers/microsoft/mssql/provider.yaml b/airflow/providers/microsoft/mssql/provider.yaml index 3ae91673b5982..7eab3a208dd43 100644 --- a/airflow/providers/microsoft/mssql/provider.yaml +++ b/airflow/providers/microsoft/mssql/provider.yaml @@ -55,7 +55,7 @@ versions: dependencies: - apache-airflow>=2.8.0 - - apache-airflow-providers-common-sql>=1.14.1 + - apache-airflow-providers-common-sql>=1.17.0 - pymssql>=2.3.0 # The methodtools dependency can be removed with min airflow version >=2.9.1 # as it was added in https://github.com/apache/airflow/pull/37757 diff --git a/airflow/providers/mysql/hooks/mysql.py b/airflow/providers/mysql/hooks/mysql.py index 78376d3f9c705..a6e40ad32b87c 100644 --- a/airflow/providers/mysql/hooks/mysql.py +++ b/airflow/providers/mysql/hooks/mysql.py @@ -72,7 +72,6 @@ class MySqlHook(DbApiHook): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.schema = kwargs.pop("schema", None) - self.connection = kwargs.pop("connection", None) self.local_infile = kwargs.pop("local_infile", False) self.init_command = kwargs.pop("init_command", None) diff --git a/airflow/providers/mysql/provider.yaml b/airflow/providers/mysql/provider.yaml index 309b4dc727fce..355d23f946da7 100644 --- a/airflow/providers/mysql/provider.yaml +++ b/airflow/providers/mysql/provider.yaml @@ -66,7 +66,7 @@ versions: dependencies: - apache-airflow>=2.8.0 - - apache-airflow-providers-common-sql>=1.14.1 + - apache-airflow-providers-common-sql>=1.17.0 - mysqlclient>=1.4.0 - mysql-connector-python>=8.0.29 diff --git a/airflow/providers/odbc/hooks/odbc.py b/airflow/providers/odbc/hooks/odbc.py index 61a5818f8e2ae..48dada49f88ba 100644 --- a/airflow/providers/odbc/hooks/odbc.py +++ b/airflow/providers/odbc/hooks/odbc.py @@ -79,13 +79,6 @@ def __init__( self._connection = None self._connect_kwargs = connect_kwargs - @property - def connection(self): - """The Connection object with ID ``odbc_conn_id``.""" - if not self._connection: - self._connection = self.get_connection(self.get_conn_id()) - return self._connection - @property def database(self) -> str | None: """Database provided in init if exists; otherwise, ``schema`` from ``Connection`` object.""" @@ -99,15 +92,6 @@ def sqlalchemy_scheme(self) -> str: raise RuntimeError("sqlalchemy_scheme in connection extra should not contain : or / characters") return self._sqlalchemy_scheme or extra_scheme or self.DEFAULT_SQLALCHEMY_SCHEME - @property - def connection_extra_lower(self) -> dict: - """ - ``connection.extra_dejson`` but where keys are converted to lower case. - - This is used internally for case-insensitive access of odbc params. - """ - return {k.lower(): v for k, v in self.connection.extra_dejson.items()} - @property def driver(self) -> str | None: """Driver from init param if given; else try to find one in connection extra.""" @@ -166,9 +150,7 @@ def odbc_connection_string(self): conn_str += f"PORT={self.connection.port};" extra_exclude = {"driver", "dsn", "connect_kwargs", "sqlalchemy_scheme", "placeholder"} - extra_params = { - k: v for k, v in self.connection.extra_dejson.items() if k.lower() not in extra_exclude - } + extra_params = {k: v for k, v in self.connection_extra.items() if k.lower() not in extra_exclude} for k, v in extra_params.items(): conn_str += f"{k}={v};" diff --git a/airflow/providers/odbc/provider.yaml b/airflow/providers/odbc/provider.yaml index 2f3a0b418f84e..1a0dd5d7d3f76 100644 --- a/airflow/providers/odbc/provider.yaml +++ b/airflow/providers/odbc/provider.yaml @@ -54,7 +54,7 @@ versions: dependencies: - apache-airflow>=2.8.0 - - apache-airflow-providers-common-sql>=1.14.1 + - apache-airflow-providers-common-sql>=1.17.0 - pyodbc>=5.0.0 integrations: diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py index ede21975aec67..4790575565fce 100644 --- a/airflow/providers/postgres/hooks/postgres.py +++ b/airflow/providers/postgres/hooks/postgres.py @@ -88,7 +88,6 @@ def __init__(self, *args, options: str | None = None, **kwargs) -> None: ) kwargs["database"] = kwargs["schema"] super().__init__(*args, **kwargs) - self.connection: Connection | None = kwargs.pop("connection", None) self.conn: connection = None self.database: str | None = kwargs.pop("database", None) self.options = options @@ -142,8 +141,7 @@ def _get_cursor(self, raw_cursor: str) -> CursorType: def get_conn(self) -> connection: """Establish a connection to a postgres database.""" - conn_id = self.get_conn_id() - conn = deepcopy(self.connection or self.get_connection(conn_id)) + conn = deepcopy(self.connection) # check for authentication via AWS IAM if conn.extra_dejson.get("iam", False): diff --git a/airflow/providers/postgres/provider.yaml b/airflow/providers/postgres/provider.yaml index 3de77272484f3..8a464340712a4 100644 --- a/airflow/providers/postgres/provider.yaml +++ b/airflow/providers/postgres/provider.yaml @@ -65,7 +65,7 @@ versions: dependencies: - apache-airflow>=2.8.0 - - apache-airflow-providers-common-sql>=1.14.1 + - apache-airflow-providers-common-sql>=1.17.0 - psycopg2-binary>=2.9.4 additional-extras: diff --git a/dev/breeze/tests/test_packages.py b/dev/breeze/tests/test_packages.py index 3294c197e5849..9556ae695be8e 100644 --- a/dev/breeze/tests/test_packages.py +++ b/dev/breeze/tests/test_packages.py @@ -204,7 +204,7 @@ def test_get_documentation_package_path(): "postgres", "beta0", """ - "apache-airflow-providers-common-sql>=1.14.1b0", + "apache-airflow-providers-common-sql>=1.17.0b0", "apache-airflow>=2.8.0b0", "psycopg2-binary>=2.9.4", """, @@ -214,7 +214,7 @@ def test_get_documentation_package_path(): "postgres", "", """ - "apache-airflow-providers-common-sql>=1.14.1", + "apache-airflow-providers-common-sql>=1.17.0", "apache-airflow>=2.8.0", "psycopg2-binary>=2.9.4", """, diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index cc1f796f75fde..7f85666892218 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -518,7 +518,7 @@ }, "elasticsearch": { "deps": [ - "apache-airflow-providers-common-sql>=1.14.1", + "apache-airflow-providers-common-sql>=1.17.0", "apache-airflow>=2.8.0", "elasticsearch>=8.10,<9" ], @@ -756,7 +756,7 @@ }, "jdbc": { "deps": [ - "apache-airflow-providers-common-sql>=1.14.1", + "apache-airflow-providers-common-sql>=1.17.0", "apache-airflow>=2.8.0", "jaydebeapi>=1.1.1" ], @@ -820,7 +820,7 @@ }, "microsoft.mssql": { "deps": [ - "apache-airflow-providers-common-sql>=1.14.1", + "apache-airflow-providers-common-sql>=1.17.0", "apache-airflow>=2.8.0", "methodtools>=0.4.7", "pymssql>=2.3.0" @@ -871,7 +871,7 @@ }, "mysql": { "deps": [ - "apache-airflow-providers-common-sql>=1.14.1", + "apache-airflow-providers-common-sql>=1.17.0", "apache-airflow>=2.8.0", "mysql-connector-python>=8.0.29", "mysqlclient>=1.4.0" @@ -902,7 +902,7 @@ }, "odbc": { "deps": [ - "apache-airflow-providers-common-sql>=1.14.1", + "apache-airflow-providers-common-sql>=1.17.0", "apache-airflow>=2.8.0", "pyodbc>=5.0.0" ], @@ -1047,7 +1047,7 @@ }, "postgres": { "deps": [ - "apache-airflow-providers-common-sql>=1.14.1", + "apache-airflow-providers-common-sql>=1.17.0", "apache-airflow>=2.8.0", "psycopg2-binary>=2.9.4" ],