Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def __init__(

# BQ config
self.destination_project_dataset_table = destination_project_dataset_table
self.project_id = project_id
self.project_id = project_id if project_id is not None else self.hook.project_id
self.schema_fields = schema_fields
if source_format.upper() not in ALLOWED_FORMATS:
raise ValueError(
Expand Down Expand Up @@ -575,7 +575,7 @@ def _create_empty_table(self):
return table_obj_api_repr

def _use_existing_table(self):
self.project_id, destination_dataset, destination_table = self.hook.split_tablename(
destination_project, destination_dataset, destination_table = self.hook.split_tablename(
table_input=self.destination_project_dataset_table,
default_project_id=self.project_id or self.hook.project_id,
var_name="destination_project_dataset_table",
Expand All @@ -597,7 +597,7 @@ def _use_existing_table(self):
"autodetect": self.autodetect,
"createDisposition": self.create_disposition,
"destinationTable": {
"projectId": self.project_id,
"projectId": destination_project,
"datasetId": destination_dataset,
"tableId": destination_table,
},
Expand Down
21 changes: 12 additions & 9 deletions tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
TEST_EXPLICIT_DEST = "test-project.dataset.table"
TEST_BUCKET = "test-bucket"
PROJECT_ID = "test-project"
OTHER_PROJECT_ID = "other-test-project"
DATASET = "dataset"
TABLE = "table"
WRITE_DISPOSITION = "WRITE_TRUNCATE"
Expand Down Expand Up @@ -85,6 +86,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook):
schema_fields=SCHEMA_FIELDS,
max_id_key=MAX_ID_KEY,
external_table=True,
project_id=OTHER_PROJECT_ID
)

result = operator.execute(context=MagicMock())
Expand All @@ -93,7 +95,7 @@ def test_max_value_external_table_should_execute_successfully(self, hook):
hook.return_value.create_empty_table.assert_called_once_with(
exists_ok=True,
location=None,
project_id=PROJECT_ID,
project_id=OTHER_PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE},
"labels": {},
Expand Down Expand Up @@ -172,7 +174,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.split_tablename.return_value[0],
project_id=PROJECT_ID,
retry=DEFAULT_RETRY,
timeout=None,
),
Expand Down Expand Up @@ -233,7 +235,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook):
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.split_tablename.return_value[0],
project_id=PROJECT_ID
retry=DEFAULT_RETRY,
timeout=None,
),
Expand Down Expand Up @@ -316,6 +318,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook):
schema_fields=SCHEMA_FIELDS,
external_table=False,
labels=LABELS,
project_id=PROJECT_ID
)

operator.execute(context=MagicMock())
Expand All @@ -342,7 +345,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook):
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.split_tablename.return_value[0],
project_id=PROJECT_ID
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down Expand Up @@ -441,7 +444,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho
fieldDelimiter=",",
),
},
project_id=hook.return_value.split_tablename.return_value[0],
project_id=PROJECT_ID
location=None,
job_id=pytest.real_job_id,
timeout=None,
Expand Down Expand Up @@ -545,7 +548,7 @@ def test_source_objs_as_list_without_external_table_should_execute_successfully(
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.split_tablename.return_value[0],
project_id=PROJECT_ID
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down Expand Up @@ -645,7 +648,7 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.split_tablename.return_value[0],
project_id=PROJECT_ID
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down Expand Up @@ -842,7 +845,7 @@ def test_autodetect_none_without_external_table_should_execute_successfully(self
"encoding": "UTF-8",
}
},
project_id=hook.return_value.split_tablename.return_value[0],
project_id=PROJECT_ID
location=None,
job_id=pytest.real_job_id,
timeout=None,
Expand Down Expand Up @@ -1129,7 +1132,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self,
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.split_tablename.return_value[0],
project_id=PROJECT_ID
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down