From 9fcaebc5b7d4d824f19cf95e8e29182a4956910f Mon Sep 17 00:00:00 2001 From: Linchin Date: Mon, 3 Jun 2024 18:15:49 -0700 Subject: [PATCH 1/2] fix: create query job in job.result() if doesn't exist --- google/cloud/bigquery/job/query.py | 5 ++ tests/unit/job/test_query.py | 82 ++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index a8530271a..8049b748e 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1585,6 +1585,11 @@ def is_job_done(): self._retry_do_query = retry_do_query self._job_retry = job_retry + # If the job hasn't been created, create it now. Related: + # https://github.com/googleapis/python-bigquery/issues/1940 + if self.state is None: + self._begin(retry=retry, **done_kwargs) + # Refresh the job status with jobs.get because some of the # exceptions thrown by jobs.getQueryResults like timeout and # rateLimitExceeded errors are ambiguous. We want to know if diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 66055dee1..b682fd19d 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -1037,6 +1037,85 @@ def test_result_dry_run(self): self.assertIsNone(result.job_id) self.assertIsNone(result.query_id) + # If the job doesn't exist, create the job first. Issue: + # https://github.com/googleapis/python-bigquery/issues/1940 + def test_result_begin_job_if_not_exist(self): + begun_resource = self._make_resource() + query_running_resource = { + "jobComplete": True, + "jobReference": { + "projectId": self.PROJECT, + "jobId": self.JOB_ID, + "location": "US", + }, + "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, + "status": {"state": "RUNNING"}, + } + query_done_resource = { + "jobComplete": True, + "jobReference": { + "projectId": self.PROJECT, + "jobId": self.JOB_ID, + "location": "US", + }, + "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, + "status": {"state": "DONE"}, + } + done_resource = copy.deepcopy(begun_resource) + done_resource["status"] = {"state": "DONE"} + connection = make_connection( + begun_resource, + query_running_resource, + query_done_resource, + done_resource, + ) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + job._properties["jobReference"]["location"] = "US" + + job.result() + + create_job_call = mock.call( + method="POST", + path=f"/projects/{self.PROJECT}/jobs", + data={ + "jobReference": { + "jobId": self.JOB_ID, + "projectId": self.PROJECT, + "location": "US", + }, + "configuration": { + "query": {"useLegacySql": False, "query": self.QUERY}, + }, + }, + timeout=None, + ) + reload_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", + query_params={"projection": "full", "location": "US"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, + ) + get_query_results_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}", + query_params={ + "maxResults": 0, + "location": "US", + }, + timeout=None, + ) + + # assert connection.api_request.call_args == [] + connection.api_request.assert_has_calls( + [ + create_job_call, + reload_call, + get_query_results_call, + reload_call, + ] + ) + def test_result_with_done_job_calls_get_query_results(self): query_resource_done = { "jobComplete": True, @@ -1379,6 +1458,7 @@ def test_result_w_timeout_doesnt_raise(self): client = _make_client(project=self.PROJECT, connection=connection) job = self._make_one(self.JOB_ID, self.QUERY, client) job._properties["jobReference"]["location"] = "US" + job._properties["status"] = {"state": "RUNNING"} with freezegun.freeze_time("1970-01-01 00:00:00", tick=False): job.result( @@ -1429,6 +1509,7 @@ def test_result_w_timeout_raises_concurrent_futures_timeout(self): client = _make_client(project=self.PROJECT, connection=connection) job = self._make_one(self.JOB_ID, self.QUERY, client) job._properties["jobReference"]["location"] = "US" + job._properties["status"] = {"state": "RUNNING"} with freezegun.freeze_time( "1970-01-01 00:00:00", auto_tick_seconds=1.0 @@ -2319,5 +2400,6 @@ def test_iter(self): connection = make_connection(begun_resource, query_resource, done_resource) client = _make_client(project=self.PROJECT, connection=connection) job = self._make_one(self.JOB_ID, self.QUERY, client) + job._properties["status"] = {"state": "RUNNING"} self.assertIsInstance(iter(job), types.GeneratorType) From c194220614688c676f8abe8d7ee82742b7b8207f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Tue, 4 Jun 2024 09:10:41 -0500 Subject: [PATCH 2/2] Apply suggestions from code review --- tests/unit/job/test_query.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index b682fd19d..5b69c98cf 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -1106,9 +1106,10 @@ def test_result_begin_job_if_not_exist(self): timeout=None, ) - # assert connection.api_request.call_args == [] connection.api_request.assert_has_calls( [ + # Make sure we start a job that hasn't started yet. See: + # https://github.com/googleapis/python-bigquery/issues/1940 create_job_call, reload_call, get_query_results_call,