Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

closes: #45233

Why?

#46869 introduced a way to get variables in task execution as well as the dag parsing time. We need to extend this support additionally to connections as well.

What?

  • Introducing a method in the "definitions" of Connection interface so that dag authors can get connections in their dags.
  • Exposing that method through the BaseHook.get_connection interface with a AF 3 check.
  • Also adding deprecation warning for that.
  • Over time, we should enforce usage of Connection.get instead from task sdk.

Testing

DAG1: BaseHook.get_connection at task level

from __future__ import annotations

from airflow.hooks.base import BaseHook
from airflow.models.baseoperator import BaseOperator
from airflow import DAG

class CustomOperator(BaseOperator):
    def execute(self, context):
        c = BaseHook.get_connection(conn_id="airflow_db")
        print("The connection is", c)


with DAG("example_get_connection", schedule=None, catchup=False) as dag:
    CustomOperator(task_id="print_conn")
image

Logs:

{"timestamp":"2025-02-25T06:13:39.152691Z","level":"info","event":"The connection is Connection(conn_id=\'airflow_db\', conn_type=\'mysql\', description=None, host=\'mysql\', schema=\'airflow\', login=\'root\', password=None, port=None, extra=None)","chan":"stdout","logger":"task"}

{"timestamp":"2025-02-25T06:13:39.152775","level":"debug","event":"Sending request","json":"{\\"state\\":\\"success\\",\\"end_date\\":\\"2025-02-25T06:13:39.152708Z\\",\\"task_outlets\\":[],\\"outlet_events\\":[],\\"type\\":\\"SucceedTask\\"}\

","logger":"task"}

DAG2: BaseHook.get_connection at DAG level

from __future__ import annotations

from airflow.hooks.base import BaseHook
from airflow.models.baseoperator import BaseOperator
from airflow import DAG

c = BaseHook.get_connection(conn_id="airflow_db")

class CustomOperator(BaseOperator):
    def execute(self, context):
        print("The connection is", c)


with DAG("example_get_connection_top_level", schedule=None, catchup=False) as dag:
    CustomOperator(task_id="print_conn")
image
{"timestamp":"2025-02-25T06:16:21.954152","level":"debug","event":"Sending request","json":"{\\"conn_id\\":\\"airflow_db\\",\\"type\\":\\"GetConnection\\"}\

","logger":"task"}

{"timestamp":"2025-02-25T06:16:21.961602","level":"debug","event":"Loaded DAG <DAG: example_get_connection_top_level>","logger":"airflow.models.dagbag.DagBag"}

{"timestamp":"2025-02-25T06:16:21.961721","level":"debug","event":"DAG file parsed","file":"dags/get_connection_basehook_top_level.py","logger":"task"}

{"timestamp":"2025-02-25T06:16:21.962241","level":"debug","event":"Calling \'on_task_instance_running\' with {\'previous_state\': <TaskInstanceState.QUEUED: \'queued\'>, \'task_instance\': RuntimeTaskInstance(id=UUID(\'01953bbf-cd5e-74f9-9101-c2bd5f1ffa94\'), task_id=\'print_conn\', dag_id=\'example_get_connection_top_level\', run_id=\'manual__2025-02-25T06:16:20.565213+00:00_PdRu2Pao\', try_number=1, map_index=-1, hostname=\'c1e5f155bdf2\', task=<Task(CustomOperator): print_conn>, max_tries=0, start_date=datetime.datetime(2025, 2, 25, 6, 16, 21, 564673, tzinfo=TzInfo(UTC)))}","logger":"airflow.listeners.listener"}

{"timestamp":"2025-02-25T06:16:21.962279","level":"debug","event":"Hook impls: []","logger":"airflow.listeners.listener"}

{"timestamp":"2025-02-25T06:16:21.962310","level":"debug","event":"Result from \'on_task_instance_running\': []","logger":"airflow.listeners.listener"}

{"timestamp":"2025-02-25T06:16:21.976790","level":"warning","event":"CustomOperator.execute cannot be called outside TaskInstance!","logger":"airflow.task.operators.unusual_prefix_c7b960ed9a4e578b5a5d62847ae08ffc97bb8849_get_connection_basehook_top_level.CustomOperator"}

{"timestamp":"2025-02-25T06:16:21.976990","level":"debug","event":"Sending request","json":"{\\"state\\":\\"success\\",\\"end_date\\":\\"2025-02-25T06:16:21.976903Z\\",\\"task_outlets\\":[],\\"outlet_events\\":[],\\"type\\":\\"SucceedTask\\"}\

In addition to this, we can now also use Connection object to get conneciton from get:

DAG:

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow import DAG
from airflow.sdk.definitions.connection import Connection

gc = Connection.get("athena_default")

class CustomOperator(BaseOperator):
    def execute(self, context):
        c = Connection.get(conn_id="airflow_db")
        print("The connection is", c)

        print("==="*50)

        print("The global connection is", gc)


with DAG("example_get_connection_from_defn", schedule=None, catchup=False) as dag:
    CustomOperator(task_id="print_conn")
image

Logs:

{"timestamp":"2025-02-25T08:41:11.497544","level":"warning","event":"CustomOperator.execute cannot be called outside TaskInstance!","logger":"airflow.task.operators.unusual_prefix_86546004ac79c62cfec07aac2421c5cd02812cec_get_connection_sdk.CustomOperator"}

{"timestamp":"2025-02-25T08:41:11.497700","level":"debug","event":"Sending request","json":"{\\"conn_id\\":\\"airflow_db\\",\\"type\\":\\"GetConnection\\"}\

","logger":"task"}

{"timestamp":"2025-02-25T08:41:11.516903Z","level":"info","event":"The connection is Connection(conn_id=\'airflow_db\', conn_type=\'mysql\', description=None, host=\'mysql\', schema=\'airflow\', login=\'root\', password=None, port=None, extra=None)","chan":"stdout","logger":"task"}

{"timestamp":"2025-02-25T08:41:11.517403Z","level":"info","event":"======================================================================================================================================================","chan":"stdout","logger":"task"}

{"timestamp":"2025-02-25T08:41:11.522362Z","level":"info","event":"The global connection is Connection(conn_id=\'athena_default\', conn_type=\'athena\', description=None, host=None, schema=None, login=None, password=None, port=None, extra=None)","chan":"stdout","logger":"task"}

{"timestamp":"2025-02-25T08:41:11.517086","level":"debug","event":"Sending request","json":"{\\"state\\":\\"success\\",\\"end_date\\":\\"2025-02-25T08:41:11.516715Z\\",\\"task_outlets\\":[],\\"outlet_events\\":[],\\"type\\":\\"SucceedTask\\"}\

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

amoghrajesh and others added 3 commits February 25, 2025 22:27
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
@amoghrajesh amoghrajesh self-assigned this Feb 26, 2025
@amoghrajesh amoghrajesh merged commit 4002d83 into apache:main Feb 26, 2025
63 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-get-connection-from-basehook branch February 26, 2025 06:42
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 28, 2025
@ramitkataria
Copy link
Contributor

ramitkataria commented Mar 6, 2025

Hi @amoghrajesh, this PR seems to have affected some existing functionality in the AWS hook, specifically the boto credential fallback implemented here:

self.log.info(
"No connection ID provided. Fallback on boto3 credential strategy (region_name=%r). "
"See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html",
self.region_name,
)
if deferrable:
session = self.get_async_session()
self._apply_session_kwargs(session)
return session
else:
return boto3.session.Session(region_name=self.region_name)

Earlier, if you didn't have an aws connection, this code would create a boto session for the user but now it looks like this code block is never reached and instead the task fails due to not having an aws connection. It would be great if you could take a look at this, and let me know if you need anything else from me!

@amoghrajesh
Copy link
Contributor Author

Hey @ramitkataria, it is not easy for us to track each provider and each usage when such custom behaviour is implemented in inheriting hooks. From your justification, seems like you know the cause of the error, could you try and potentially fix it too?

  • One thing I would do is, log the response self.conn or related errors to see what is going wrong for starters.

@ramitkataria
Copy link
Contributor

ramitkataria commented Mar 7, 2025

Yep, makes sense - I was just hoping maybe you might know a potential cause for this. I spent some time earlier trying to figure out the cause but I'm currently not very familiar with TaskSDK. I'm planning to familiarize myself soon and then try debugging this again.

Also, I forgot to include some important context earlier: It does work once after breeze down but subsequent runs don't work so it seems like it has to do with saving the connection in the database when it shouldn't be. Seems like not anymore

ramitkataria added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Mar 11, 2025
…ricHook`

After apache#47048, the boto fallback strategy started breaking in
`AwsGenericHook` (explained in more detail here:
apache#47048 (comment)).
I realized that this was because a new exception `AirflowRuntimeError`
was introduced in `BaseHook`'s `get_connection`. It used to only throw
`AirflowNotFoundException` earlier. This has been temporarily resolved
in apache#47593 but to avoid issues once it does get re-introduced,
`AwsGenericHook` will now also catch `AirflowRuntimeError`.
ramitkataria added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Mar 24, 2025
…ricHook`

After apache#47048, the boto fallback strategy started breaking in
`AwsGenericHook` (explained in more detail here:
apache#47048 (comment)).
I realized that this was because a new exception `AirflowRuntimeError`
was introduced in `BaseHook`'s `get_connection`. It used to only throw
`AirflowNotFoundException` earlier. This has been temporarily resolved
in apache#47593 but to avoid issues once it does get re-introduced,
`AwsGenericHook` will now also catch `AirflowRuntimeError`.
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

(AIP-72): Add support for getting connections via BaseHook for task sdk

3 participants