From 812d98f1ba13ecb206d3a4b6a0e942728b4d2875 Mon Sep 17 00:00:00 2001 From: Cedrik Neumann Date: Fri, 19 Jan 2024 14:55:05 +0100 Subject: [PATCH 1/5] fix(MetastoreHivePartitionSensor): pass impersonation chain to GCSHook The operator already accepts `impersonation_chain`, but does not pass it to the GCSHook. --- airflow/providers/google/cloud/hooks/gcs.py | 11 +++++++++-- .../google/cloud/sensors/dataproc_metastore.py | 14 ++++++++++++-- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py index 02055583ce15a..40435ad3e91f9 100644 --- a/airflow/providers/google/cloud/hooks/gcs.py +++ b/airflow/providers/google/cloud/hooks/gcs.py @@ -1336,7 +1336,11 @@ def gcs_object_is_directory(bucket: str) -> bool: return len(blob) == 0 or blob.endswith("/") -def parse_json_from_gcs(gcp_conn_id: str, file_uri: str) -> Any: +def parse_json_from_gcs( + gcp_conn_id: str, + file_uri: str, + impersonation_chain: str | Sequence[str] | None = None, +) -> Any: """ Downloads and parses json file from Google cloud Storage. @@ -1344,7 +1348,10 @@ def parse_json_from_gcs(gcp_conn_id: str, file_uri: str) -> Any: :param file_uri: full path to json file example: ``gs://test-bucket/dir1/dir2/file`` """ - gcs_hook = GCSHook(gcp_conn_id=gcp_conn_id) + gcs_hook = GCSHook( + gcp_conn_id=gcp_conn_id, + impersonation_chain=impersonation_chain, + ) bucket, blob = _parse_gcs_url(file_uri) with NamedTemporaryFile(mode="w+b") as file: try: diff --git a/airflow/providers/google/cloud/sensors/dataproc_metastore.py b/airflow/providers/google/cloud/sensors/dataproc_metastore.py index ccb222645287d..3ebf5c0f3c1d4 100644 --- a/airflow/providers/google/cloud/sensors/dataproc_metastore.py +++ b/airflow/providers/google/cloud/sensors/dataproc_metastore.py @@ -93,7 +93,11 @@ def poke(self, context: Context) -> bool: self.log.info("Received result manifest URI: %s", result_manifest_uri) self.log.info("Extracting result manifest") - manifest: dict = parse_json_from_gcs(gcp_conn_id=self.gcp_conn_id, file_uri=result_manifest_uri) + manifest: dict = parse_json_from_gcs( + gcp_conn_id=self.gcp_conn_id, + file_uri=result_manifest_uri, + impersonation_chain=self.impersonation_chain, + ) if not (manifest and isinstance(manifest, dict)): # TODO: remove this if check when min_airflow_version is set to higher than 2.7.1 message = ( @@ -115,7 +119,13 @@ def poke(self, context: Context) -> bool: result_base_uri = result_manifest_uri.rsplit("/", 1)[0] results = (f"{result_base_uri}//{filename}" for filename in manifest.get("filenames", [])) found_partitions = sum( - len(parse_json_from_gcs(gcp_conn_id=self.gcp_conn_id, file_uri=uri).get("rows", [])) + len( + parse_json_from_gcs( + gcp_conn_id=self.gcp_conn_id, + file_uri=uri, + impersonation_chain=self.impersonation_chain, + ).get("rows", []) + ) for uri in results ) From bbd4727f394bda2c5f06107ad11a5090085e508b Mon Sep 17 00:00:00 2001 From: Cedrik Neumann Date: Fri, 19 Jan 2024 15:00:22 +0100 Subject: [PATCH 2/5] fix(BigQueryGetDataOperator): pass impersonation chain to BigQueryGetDataTrigger The operator already accepts `impersonation_chain`, but does not pass it to the BigQueryGetDataTrigger. --- airflow/providers/google/cloud/operators/bigquery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index deadf0d193604..f688b69bc6005 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -1069,6 +1069,7 @@ def execute(self, context: Context): project_id=self.job_project_id or hook.project_id, poll_interval=self.poll_interval, as_dict=self.as_dict, + impersonation_chain=self.impersonation_chain, ), method_name="execute_complete", ) From a1a3430fa2f8a3e8ead6a452dd5c7633d6dfeac3 Mon Sep 17 00:00:00 2001 From: Cedrik Neumann Date: Fri, 19 Jan 2024 15:02:07 +0100 Subject: [PATCH 3/5] fix(BigQueryInsertJobOperator): pass impersonation chain to BigQueryInsertJobTrigger The operator already accepts `impersonation_chain`, but does not pass it to the BigQueryInsertJobTrigger. --- airflow/providers/google/cloud/operators/bigquery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index f688b69bc6005..2896b5284f618 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2877,6 +2877,7 @@ def execute(self, context: Any): job_id=self.job_id, project_id=self.project_id, poll_interval=self.poll_interval, + impersonation_chain=self.impersonation_chain, ), method_name="execute_complete", ) From adcbdff9bcd6ecb8d3e6d979b53b47dd98a7cb50 Mon Sep 17 00:00:00 2001 From: Cedrik Neumann Date: Fri, 19 Jan 2024 15:03:35 +0100 Subject: [PATCH 4/5] fix(BigQueryToGCSOperator): pass impersonation chain to BigQueryInsertJobTrigger The operator already accepts `impersonation_chain`, but does not pass it to the BigQueryInsertJobTrigger. --- airflow/providers/google/cloud/transfers/bigquery_to_gcs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 58456b10f9816..3ede4db32ff81 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -261,6 +261,7 @@ def execute(self, context: Context): conn_id=self.gcp_conn_id, job_id=self._job_id, project_id=self.project_id or self.hook.project_id, + impersonation_chain=self.impersonation_chain, ), method_name="execute_complete", ) From f4cfaad739963eb49a20c5fdb3ec42d7ce275e65 Mon Sep 17 00:00:00 2001 From: Cedrik Neumann Date: Fri, 19 Jan 2024 15:05:42 +0100 Subject: [PATCH 5/5] fix(GCSToBigQueryOperator): pass impersonation chain to BigQueryInsertJobTrigger The operator already accepts `impersonation_chain`, but does not pass it to the BigQueryInsertJobTrigger. --- airflow/providers/google/cloud/transfers/gcs_to_bigquery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index b22b7d8f8da54..9d8ce53f4c1b2 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -435,6 +435,7 @@ def execute(self, context: Context): conn_id=self.gcp_conn_id, job_id=self.job_id, project_id=self.project_id or self.hook.project_id, + impersonation_chain=self.impersonation_chain, ), method_name="execute_complete", )