diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py index 02055583ce15a..40435ad3e91f9 100644 --- a/airflow/providers/google/cloud/hooks/gcs.py +++ b/airflow/providers/google/cloud/hooks/gcs.py @@ -1336,7 +1336,11 @@ def gcs_object_is_directory(bucket: str) -> bool: return len(blob) == 0 or blob.endswith("/") -def parse_json_from_gcs(gcp_conn_id: str, file_uri: str) -> Any: +def parse_json_from_gcs( + gcp_conn_id: str, + file_uri: str, + impersonation_chain: str | Sequence[str] | None = None, +) -> Any: """ Downloads and parses json file from Google cloud Storage. @@ -1344,7 +1348,10 @@ def parse_json_from_gcs(gcp_conn_id: str, file_uri: str) -> Any: :param file_uri: full path to json file example: ``gs://test-bucket/dir1/dir2/file`` """ - gcs_hook = GCSHook(gcp_conn_id=gcp_conn_id) + gcs_hook = GCSHook( + gcp_conn_id=gcp_conn_id, + impersonation_chain=impersonation_chain, + ) bucket, blob = _parse_gcs_url(file_uri) with NamedTemporaryFile(mode="w+b") as file: try: diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index deadf0d193604..2896b5284f618 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -1069,6 +1069,7 @@ def execute(self, context: Context): project_id=self.job_project_id or hook.project_id, poll_interval=self.poll_interval, as_dict=self.as_dict, + impersonation_chain=self.impersonation_chain, ), method_name="execute_complete", ) @@ -2876,6 +2877,7 @@ def execute(self, context: Any): job_id=self.job_id, project_id=self.project_id, poll_interval=self.poll_interval, + impersonation_chain=self.impersonation_chain, ), method_name="execute_complete", ) diff --git a/airflow/providers/google/cloud/sensors/dataproc_metastore.py b/airflow/providers/google/cloud/sensors/dataproc_metastore.py index ccb222645287d..3ebf5c0f3c1d4 100644 --- a/airflow/providers/google/cloud/sensors/dataproc_metastore.py +++ b/airflow/providers/google/cloud/sensors/dataproc_metastore.py @@ -93,7 +93,11 @@ def poke(self, context: Context) -> bool: self.log.info("Received result manifest URI: %s", result_manifest_uri) self.log.info("Extracting result manifest") - manifest: dict = parse_json_from_gcs(gcp_conn_id=self.gcp_conn_id, file_uri=result_manifest_uri) + manifest: dict = parse_json_from_gcs( + gcp_conn_id=self.gcp_conn_id, + file_uri=result_manifest_uri, + impersonation_chain=self.impersonation_chain, + ) if not (manifest and isinstance(manifest, dict)): # TODO: remove this if check when min_airflow_version is set to higher than 2.7.1 message = ( @@ -115,7 +119,13 @@ def poke(self, context: Context) -> bool: result_base_uri = result_manifest_uri.rsplit("/", 1)[0] results = (f"{result_base_uri}//{filename}" for filename in manifest.get("filenames", [])) found_partitions = sum( - len(parse_json_from_gcs(gcp_conn_id=self.gcp_conn_id, file_uri=uri).get("rows", [])) + len( + parse_json_from_gcs( + gcp_conn_id=self.gcp_conn_id, + file_uri=uri, + impersonation_chain=self.impersonation_chain, + ).get("rows", []) + ) for uri in results ) diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 58456b10f9816..3ede4db32ff81 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -261,6 +261,7 @@ def execute(self, context: Context): conn_id=self.gcp_conn_id, job_id=self._job_id, project_id=self.project_id or self.hook.project_id, + impersonation_chain=self.impersonation_chain, ), method_name="execute_complete", ) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index b22b7d8f8da54..9d8ce53f4c1b2 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -435,6 +435,7 @@ def execute(self, context: Context): conn_id=self.gcp_conn_id, job_id=self.job_id, project_id=self.project_id or self.hook.project_id, + impersonation_chain=self.impersonation_chain, ), method_name="execute_complete", )