From 90bab2bfe440b5aeb4fb68f88411823c0c5c1670 Mon Sep 17 00:00:00 2001 From: Irvi Aini Date: Mon, 26 Jun 2023 18:47:05 +0800 Subject: [PATCH] fix: Fix GCS to BigQuery Operator We have different definition for both of the project where we run the job, destination project, and hook project. We shouldn't hardcode the value of the runner project id into the destination project id. --- .../google/cloud/transfers/gcs_to_bigquery.py | 6 +++--- .../cloud/transfers/test_gcs_to_bigquery.py | 21 +++++++++++-------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 88b6d09323708..709e6170b13e9 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -248,7 +248,7 @@ def __init__( # BQ config self.destination_project_dataset_table = destination_project_dataset_table - self.project_id = project_id + self.project_id = project_id if project_id is not None else self.hook.project_id self.schema_fields = schema_fields if source_format.upper() not in ALLOWED_FORMATS: raise ValueError( @@ -575,7 +575,7 @@ def _create_empty_table(self): return table_obj_api_repr def _use_existing_table(self): - self.project_id, destination_dataset, destination_table = self.hook.split_tablename( + destination_project, destination_dataset, destination_table = self.hook.split_tablename( table_input=self.destination_project_dataset_table, default_project_id=self.project_id or self.hook.project_id, var_name="destination_project_dataset_table", @@ -597,7 +597,7 @@ def _use_existing_table(self): "autodetect": self.autodetect, "createDisposition": self.create_disposition, "destinationTable": { - "projectId": self.project_id, + "projectId": destination_project, "datasetId": destination_dataset, "tableId": destination_table, }, diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py index 80958f281f630..1984d4b08831b 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py @@ -38,6 +38,7 @@ TEST_EXPLICIT_DEST = "test-project.dataset.table" TEST_BUCKET = "test-bucket" PROJECT_ID = "test-project" +OTHER_PROJECT_ID = "other-test-project" DATASET = "dataset" TABLE = "table" WRITE_DISPOSITION = "WRITE_TRUNCATE" @@ -85,6 +86,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): schema_fields=SCHEMA_FIELDS, max_id_key=MAX_ID_KEY, external_table=True, + project_id=OTHER_PROJECT_ID ) result = operator.execute(context=MagicMock()) @@ -93,7 +95,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=OTHER_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -172,7 +174,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, ), @@ -233,7 +235,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, ), @@ -316,6 +318,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook): schema_fields=SCHEMA_FIELDS, external_table=False, labels=LABELS, + project_id=PROJECT_ID ) operator.execute(context=MagicMock()) @@ -342,7 +345,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook): job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, ) @@ -441,7 +444,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho fieldDelimiter=",", ), }, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID location=None, job_id=pytest.real_job_id, timeout=None, @@ -545,7 +548,7 @@ def test_source_objs_as_list_without_external_table_should_execute_successfully( job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, ) @@ -645,7 +648,7 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, ) @@ -842,7 +845,7 @@ def test_autodetect_none_without_external_table_should_execute_successfully(self "encoding": "UTF-8", } }, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID location=None, job_id=pytest.real_job_id, timeout=None, @@ -1129,7 +1132,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self, job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, )