Skip to content

S3ToGCSOperator broken in deferable mode #41652

@Logical-sh

Description

@Logical-sh

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-amazon = "==8.27.0"
apache-airflow-providers-google = "==10.21.1"

Apache Airflow version

2.9.1

Operating System

GCP Cloud Composer

Deployment

Google Cloud Composer

Deployment details

Manually upgraded providers to see if it was still a bug in latest version.

What happened

Received the following error when trying to use S3ToGCSOperator in deferrable mode.

Traceback (most recent call last):

  File "/opt/python3.11/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 529, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/python3.11/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 602, in run_trigger
    async for event in trigger.run():

  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py", line 77, in run
    jobs_pager = await async_hook.get_jobs(job_names=self.job_names)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 581, in get_jobs
    client = await self.get_conn()
             ^^^^^^^^^^^^^^^^^^^^^

  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 567, in get_conn
    credentials = (await self.get_sync_hook()).get_credentials()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py", line 776, in get_sync_hook
    self._sync_hook = await sync_to_async(self.sync_hook_class)(**self._hook_kwargs)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

TypeError: sync_to_async.<locals>.<lambda>() got an unexpected keyword argument 'gcp_conn_id'

What you think should happen instead

IT should not error when checking the job status on wakeup.

How to reproduce

Create a S3ToGCSOperator job with deferable enabled.

Anything else

On every run of the operator, happens after the job is queued and the code trys to check the status I think.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions