diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 88b6d09323708..709e6170b13e9 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -248,7 +248,7 @@ def __init__( # BQ config self.destination_project_dataset_table = destination_project_dataset_table - self.project_id = project_id + self.project_id = project_id if project_id is not None else self.hook.project_id self.schema_fields = schema_fields if source_format.upper() not in ALLOWED_FORMATS: raise ValueError( @@ -575,7 +575,7 @@ def _create_empty_table(self): return table_obj_api_repr def _use_existing_table(self): - self.project_id, destination_dataset, destination_table = self.hook.split_tablename( + destination_project, destination_dataset, destination_table = self.hook.split_tablename( table_input=self.destination_project_dataset_table, default_project_id=self.project_id or self.hook.project_id, var_name="destination_project_dataset_table", @@ -597,7 +597,7 @@ def _use_existing_table(self): "autodetect": self.autodetect, "createDisposition": self.create_disposition, "destinationTable": { - "projectId": self.project_id, + "projectId": destination_project, "datasetId": destination_dataset, "tableId": destination_table, }, diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py index 80958f281f630..1984d4b08831b 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py @@ -38,6 +38,7 @@ TEST_EXPLICIT_DEST = "test-project.dataset.table" TEST_BUCKET = "test-bucket" PROJECT_ID = "test-project" +OTHER_PROJECT_ID = "other-test-project" DATASET = "dataset" TABLE = "table" WRITE_DISPOSITION = "WRITE_TRUNCATE" @@ -85,6 +86,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): schema_fields=SCHEMA_FIELDS, max_id_key=MAX_ID_KEY, external_table=True, + project_id=OTHER_PROJECT_ID ) result = operator.execute(context=MagicMock()) @@ -93,7 +95,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook): hook.return_value.create_empty_table.assert_called_once_with( exists_ok=True, location=None, - project_id=PROJECT_ID, + project_id=OTHER_PROJECT_ID, table_resource={ "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, "labels": {}, @@ -172,7 +174,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, ), @@ -233,7 +235,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook): job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, ), @@ -316,6 +318,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook): schema_fields=SCHEMA_FIELDS, external_table=False, labels=LABELS, + project_id=PROJECT_ID ) operator.execute(context=MagicMock()) @@ -342,7 +345,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook): job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, ) @@ -441,7 +444,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho fieldDelimiter=",", ), }, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID location=None, job_id=pytest.real_job_id, timeout=None, @@ -545,7 +548,7 @@ def test_source_objs_as_list_without_external_table_should_execute_successfully( job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, ) @@ -645,7 +648,7 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, ) @@ -842,7 +845,7 @@ def test_autodetect_none_without_external_table_should_execute_successfully(self "encoding": "UTF-8", } }, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID location=None, job_id=pytest.real_job_id, timeout=None, @@ -1129,7 +1132,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self, job_id=pytest.real_job_id, location=None, nowait=True, - project_id=hook.return_value.split_tablename.return_value[0], + project_id=PROJECT_ID retry=DEFAULT_RETRY, timeout=None, )