Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,8 @@ def insert_job(
if not job:
raise AirflowException(f"Unknown job type. Supported types: {supported_jobs.keys()}")
job = job.from_api_repr(job_data, client)
# Start the job and wait for it to complete and get the result.
job.result()
return job

def run_with_configuration(self, configuration: Dict) -> str:
Expand All @@ -1501,8 +1503,6 @@ def run_with_configuration(self, configuration: Dict) -> str:
DeprecationWarning
)
job = self.insert_job(configuration=configuration, project_id=self.project_id)
# Start the job and wait for it to complete and get the result.
job.result()
self.running_job_id = job.job_id
return job.job_id

Expand Down Expand Up @@ -1746,8 +1746,6 @@ def run_load(self, # pylint: disable=too-many-locals,too-many-arguments,invalid
configuration['load']['allowJaggedRows'] = allow_jagged_rows

job = self.insert_job(configuration=configuration, project_id=self.project_id)
# Start the job and wait for it to complete and get the result.
job.result()
self.running_job_id = job.job_id
return job.job_id

Expand Down Expand Up @@ -1843,8 +1841,6 @@ def run_copy(self, # pylint: disable=invalid-name
] = encryption_configuration

job = self.insert_job(configuration=configuration, project_id=self.project_id)
# Start the job and wait for it to complete and get the result.
job.result()
self.running_job_id = job.job_id
return job.job_id

Expand Down Expand Up @@ -2167,8 +2163,6 @@ def run_query(self,
] = encryption_configuration

job = self.insert_job(configuration=configuration, project_id=self.project_id)
# Start the job and wait for it to complete and get the result.
job.result()
self.running_job_id = job.job_id
return job.job_id

Expand Down
36 changes: 36 additions & 0 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,42 @@ def test_run_query_with_arg(self, mock_insert):
{'label1': 'test1', 'label2': 'test2'}
)

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.QueryJob")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
def test_insert_job(self, mock_client, mock_query_job):
job_conf = {
"query": {
"query": "SELECT * FROM test",
"useLegacySql": "False",
}
}
mock_query_job._JOB_TYPE = "query"

self.hook.insert_job(
configuration=job_conf,
job_id=JOB_ID,
project_id=PROJECT_ID,
location=LOCATION,
)

mock_client.assert_called_once_with(
project_id=PROJECT_ID,
location=LOCATION,
)

mock_query_job.from_api_repr.assert_called_once_with(
{
'configuration': job_conf,
'jobReference': {
'jobId': JOB_ID,
'projectId': PROJECT_ID,
'location': LOCATION
}
},
mock_client.return_value
)
mock_query_job.from_api_repr.return_value.result.assert_called_once_with()


class TestBigQueryTableSplitter(unittest.TestCase):
def test_internal_need_default_project(self):
Expand Down