From 35e6f26521566206f0f030a573d233e1ca31cea0 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Mon, 20 Jul 2020 19:44:55 +0200 Subject: [PATCH] Fix insert_job method of BigQueryHook The method should submit the job and wait for the result. Closes: #9897 --- .../providers/google/cloud/hooks/bigquery.py | 10 ++---- .../google/cloud/hooks/test_bigquery.py | 36 +++++++++++++++++++ 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 1ccf03b0de74a..efdd6c3027bc2 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -1481,6 +1481,8 @@ def insert_job( if not job: raise AirflowException(f"Unknown job type. Supported types: {supported_jobs.keys()}") job = job.from_api_repr(job_data, client) + # Start the job and wait for it to complete and get the result. + job.result() return job def run_with_configuration(self, configuration: Dict) -> str: @@ -1501,8 +1503,6 @@ def run_with_configuration(self, configuration: Dict) -> str: DeprecationWarning ) job = self.insert_job(configuration=configuration, project_id=self.project_id) - # Start the job and wait for it to complete and get the result. - job.result() self.running_job_id = job.job_id return job.job_id @@ -1746,8 +1746,6 @@ def run_load(self, # pylint: disable=too-many-locals,too-many-arguments,invalid configuration['load']['allowJaggedRows'] = allow_jagged_rows job = self.insert_job(configuration=configuration, project_id=self.project_id) - # Start the job and wait for it to complete and get the result. - job.result() self.running_job_id = job.job_id return job.job_id @@ -1843,8 +1841,6 @@ def run_copy(self, # pylint: disable=invalid-name ] = encryption_configuration job = self.insert_job(configuration=configuration, project_id=self.project_id) - # Start the job and wait for it to complete and get the result. - job.result() self.running_job_id = job.job_id return job.job_id @@ -2167,8 +2163,6 @@ def run_query(self, ] = encryption_configuration job = self.insert_job(configuration=configuration, project_id=self.project_id) - # Start the job and wait for it to complete and get the result. - job.result() self.running_job_id = job.job_id return job.job_id diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py index 8c34690ac2a46..8286a8e034f1f 100644 --- a/tests/providers/google/cloud/hooks/test_bigquery.py +++ b/tests/providers/google/cloud/hooks/test_bigquery.py @@ -668,6 +668,42 @@ def test_run_query_with_arg(self, mock_insert): {'label1': 'test1', 'label2': 'test2'} ) + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.QueryJob") + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client") + def test_insert_job(self, mock_client, mock_query_job): + job_conf = { + "query": { + "query": "SELECT * FROM test", + "useLegacySql": "False", + } + } + mock_query_job._JOB_TYPE = "query" + + self.hook.insert_job( + configuration=job_conf, + job_id=JOB_ID, + project_id=PROJECT_ID, + location=LOCATION, + ) + + mock_client.assert_called_once_with( + project_id=PROJECT_ID, + location=LOCATION, + ) + + mock_query_job.from_api_repr.assert_called_once_with( + { + 'configuration': job_conf, + 'jobReference': { + 'jobId': JOB_ID, + 'projectId': PROJECT_ID, + 'location': LOCATION + } + }, + mock_client.return_value + ) + mock_query_job.from_api_repr.return_value.result.assert_called_once_with() + class TestBigQueryTableSplitter(unittest.TestCase): def test_internal_need_default_project(self):