From a85fe8fd994c7f12aff6fa0c422f0085cb36fbe5 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 12 Jan 2021 11:43:37 +0100 Subject: [PATCH 1/6] fix: QueryJob.exception() should *return* errors --- google/cloud/bigquery/job/query.py | 20 ++++++++++++++--- tests/unit/job/test_query.py | 36 ++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index d87f87f52..5107f684a 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -988,7 +988,9 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): unfinished jobs before checking. Default ``True``. Returns: - bool: True if the job is complete, False otherwise. + bool: True if the job is complete, False otherwise. If job status + cannot be determined due to a non-retryable error or too many + retries, the method returns ``True``. """ # Do not refresh if the state is already done, as the job will not # change once complete. @@ -996,7 +998,15 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): if not reload or is_done: return is_done - self._reload_query_results(retry=retry, timeout=timeout) + try: + self._reload_query_results(retry=retry, timeout=timeout) + except Exception as exc: + # The job state might still be "RUNNING", but if an exception bubbles + # up (either a RetryError or a non-retryable error), we declare that we + # are done for good (and the blocking pull should not continue polling). + # Ditto in a similar try-except block below. + self.set_exception(exc) + return True # If an explicit timeout is not given, fall back to the transport timeout # stored in _blocking_poll() in the process of polling for job completion. @@ -1006,7 +1016,11 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): # This will ensure that fields such as the destination table are # correctly populated. if self._query_results.complete: - self.reload(retry=retry, timeout=transport_timeout) + try: + self.reload(retry=retry, timeout=transport_timeout) + except Exception as exc: + self.set_exception(exc) + return True return self.state == _DONE_STATE diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index a4ab11ab6..7bca3f71b 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -356,6 +356,42 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self): call_args = fake_reload.call_args self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) + def test_done_w_query_results_error(self): + client = _make_client(project=self.PROJECT) + bad_request_error = exceptions.BadRequest("Error in query") + client._get_query_results = mock.Mock(side_effect=bad_request_error) + + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + job.reload = mock.Mock(side_effect=exceptions.RetryError) + job._exception = None + + is_done = job.done() + + assert is_done + assert job._exception is bad_request_error + + def test_done_w_job_reload_error(self): + client = _make_client(project=self.PROJECT) + query_results = google.cloud.bigquery.query._QueryResults( + properties={ + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": "12345"}, + } + ) + client._get_query_results = mock.Mock(return_value=query_results) + + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError) + job.reload = mock.Mock(side_effect=retry_error) + job._exception = None + + is_done = job.done() + + assert is_done + assert job._exception is retry_error + def test_query_plan(self): from google.cloud._helpers import _RFC3339_MICROS from google.cloud.bigquery.job import QueryPlanEntry From 07af77494e3c39a97a238c9bb271591f1e473a47 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 13 Jan 2021 13:34:26 +0100 Subject: [PATCH 2/6] Reload query job on error, raise any reload errors --- google/cloud/bigquery/job/query.py | 33 ++++++++++++++---------------- tests/unit/job/test_query.py | 18 +++++++++------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 5107f684a..f41c889be 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -988,9 +988,12 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): unfinished jobs before checking. Default ``True``. Returns: - bool: True if the job is complete, False otherwise. If job status - cannot be determined due to a non-retryable error or too many - retries, the method returns ``True``. + bool: True if the job is complete, False otherwise. + + Raises: + google.api_core.exceptions,GoogleAPICallError: + If a non-retryable error ocurrs while reloading the job metadata, + or if too many retry attempts fail. """ # Do not refresh if the state is already done, as the job will not # change once complete. @@ -998,29 +1001,23 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): if not reload or is_done: return is_done - try: - self._reload_query_results(retry=retry, timeout=timeout) - except Exception as exc: - # The job state might still be "RUNNING", but if an exception bubbles - # up (either a RetryError or a non-retryable error), we declare that we - # are done for good (and the blocking pull should not continue polling). - # Ditto in a similar try-except block below. - self.set_exception(exc) - return True - # If an explicit timeout is not given, fall back to the transport timeout # stored in _blocking_poll() in the process of polling for job completion. transport_timeout = timeout if timeout is not None else self._transport_timeout + try: + self._reload_query_results(retry=retry, timeout=transport_timeout) + except exceptions.GoogleAPIError: + # Reloading also updates error details on self, no need for an + # explicit self.set_exception() call. + self.reload(retry=retry, timeout=transport_timeout) + return self.state == _DONE_STATE + # Only reload the job once we know the query is complete. # This will ensure that fields such as the destination table are # correctly populated. if self._query_results.complete: - try: - self.reload(retry=retry, timeout=transport_timeout) - except Exception as exc: - self.set_exception(exc) - return True + self.reload(retry=retry, timeout=transport_timeout) return self.state == _DONE_STATE diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 7bca3f71b..de31e7ffd 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -16,6 +16,7 @@ import copy import http import textwrap +import types import freezegun from google.api_core import exceptions @@ -363,13 +364,19 @@ def test_done_w_query_results_error(self): resource = self._make_resource(ended=False) job = self._get_target_class().from_api_repr(resource, client) - job.reload = mock.Mock(side_effect=exceptions.RetryError) job._exception = None - is_done = job.done() + def fake_reload(self, *args, **kwargs): + self._properties["status"]["state"] = "DONE" + self.set_exception(copy.copy(bad_request_error)) + + fake_reload_method = types.MethodType(fake_reload, job) + + with mock.patch.object(job, "reload", new=fake_reload_method): + is_done = job.done() assert is_done - assert job._exception is bad_request_error + assert isinstance(job._exception, exceptions.BadRequest) def test_done_w_job_reload_error(self): client = _make_client(project=self.PROJECT) @@ -387,10 +394,7 @@ def test_done_w_job_reload_error(self): job.reload = mock.Mock(side_effect=retry_error) job._exception = None - is_done = job.done() - - assert is_done - assert job._exception is retry_error + self.assertRaisesRegex(exceptions.RetryError, r"Too many retries", job.done) def test_query_plan(self): from google.cloud._helpers import _RFC3339_MICROS From a19f07e921d058699f05dd8f2b7338a3a4e3a58e Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 27 Jan 2021 15:50:58 +0100 Subject: [PATCH 3/6] Catch errors on reloading failed query jobs --- google/cloud/bigquery/job/query.py | 27 ++++++++++++++++----------- tests/unit/job/test_query.py | 25 ++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index f41c889be..a271e51f0 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -988,12 +988,8 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): unfinished jobs before checking. Default ``True``. Returns: - bool: True if the job is complete, False otherwise. - - Raises: - google.api_core.exceptions,GoogleAPICallError: - If a non-retryable error ocurrs while reloading the job metadata, - or if too many retry attempts fail. + bool: ``True`` if the job is complete or if fetching its status resulted in + an error, ``False`` otherwise. """ # Do not refresh if the state is already done, as the job will not # change once complete. @@ -1008,16 +1004,25 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): try: self._reload_query_results(retry=retry, timeout=transport_timeout) except exceptions.GoogleAPIError: - # Reloading also updates error details on self, no need for an - # explicit self.set_exception() call. - self.reload(retry=retry, timeout=transport_timeout) - return self.state == _DONE_STATE + # Reloading also updates error details on self, thus no need for an + # explicit self.set_exception() call if reloading succeeds. + try: + self.reload(retry=retry, timeout=transport_timeout) + except exceptions.GoogleAPIError as exc: + self.set_exception(exc) + return True + else: + return self.state == _DONE_STATE # Only reload the job once we know the query is complete. # This will ensure that fields such as the destination table are # correctly populated. if self._query_results.complete: - self.reload(retry=retry, timeout=transport_timeout) + try: + self.reload(retry=retry, timeout=transport_timeout) + except exceptions.GoogleAPIError as exc: + self.set_exception(exc) + return True return self.state == _DONE_STATE diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index de31e7ffd..f662e25bc 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -357,7 +357,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self): call_args = fake_reload.call_args self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) - def test_done_w_query_results_error(self): + def test_done_w_query_results_error_reload_ok(self): client = _make_client(project=self.PROJECT) bad_request_error = exceptions.BadRequest("Error in query") client._get_query_results = mock.Mock(side_effect=bad_request_error) @@ -378,7 +378,23 @@ def fake_reload(self, *args, **kwargs): assert is_done assert isinstance(job._exception, exceptions.BadRequest) - def test_done_w_job_reload_error(self): + def test_done_w_query_results_error_reload_error(self): + client = _make_client(project=self.PROJECT) + bad_request_error = exceptions.BadRequest("Error in query") + client._get_query_results = mock.Mock(side_effect=bad_request_error) + + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + reload_error = exceptions.DataLoss("Oops, sorry!") + job.reload = mock.Mock(side_effect=reload_error) + job._exception = None + + is_done = job.done() + + assert is_done + assert job._exception is reload_error + + def test_done_w_job_query_results_ok_reload_error(self): client = _make_client(project=self.PROJECT) query_results = google.cloud.bigquery.query._QueryResults( properties={ @@ -394,7 +410,10 @@ def test_done_w_job_reload_error(self): job.reload = mock.Mock(side_effect=retry_error) job._exception = None - self.assertRaisesRegex(exceptions.RetryError, r"Too many retries", job.done) + is_done = job.done() + + assert is_done + assert job._exception is retry_error def test_query_plan(self): from google.cloud._helpers import _RFC3339_MICROS From f3ddc144b8a254bb72f94514ef97adf200b44f86 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 27 Jan 2021 16:20:59 +0100 Subject: [PATCH 4/6] Add additional unit test --- tests/unit/job/test_query.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index f662e25bc..a16572f23 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -309,7 +309,7 @@ def test_cancelled(self): self.assertTrue(job.cancelled()) - def test_done(self): + def test_done_job_complete(self): client = _make_client(project=self.PROJECT) resource = self._make_resource(ended=True) job = self._get_target_class().from_api_repr(resource, client) @@ -357,7 +357,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self): call_args = fake_reload.call_args self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) - def test_done_w_query_results_error_reload_ok(self): + def test_done_w_query_results_error_reload_ok_job_finished(self): client = _make_client(project=self.PROJECT) bad_request_error = exceptions.BadRequest("Error in query") client._get_query_results = mock.Mock(side_effect=bad_request_error) @@ -378,6 +378,26 @@ def fake_reload(self, *args, **kwargs): assert is_done assert isinstance(job._exception, exceptions.BadRequest) + def test_done_w_query_results_error_reload_ok_job_still_running(self): + client = _make_client(project=self.PROJECT) + retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError) + client._get_query_results = mock.Mock(side_effect=retry_error) + + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + job._exception = None + + def fake_reload(self, *args, **kwargs): + self._properties["status"]["state"] = "RUNNING" + + fake_reload_method = types.MethodType(fake_reload, job) + + with mock.patch.object(job, "reload", new=fake_reload_method): + is_done = job.done() + + assert not is_done + assert job._exception is None + def test_done_w_query_results_error_reload_error(self): client = _make_client(project=self.PROJECT) bad_request_error = exceptions.BadRequest("Error in query") From ef2fe8e58ce5ae36ae9244c17393055ec7644214 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 28 Jan 2021 13:44:23 +0100 Subject: [PATCH 5/6] Increase retry deadline to mitigate test flakiness --- tests/unit/job/test_base.py | 2 +- tests/unit/job/test_query.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/job/test_base.py b/tests/unit/job/test_base.py index 44bbc2c77..adccd54e8 100644 --- a/tests/unit/job/test_base.py +++ b/tests/unit/job/test_base.py @@ -943,7 +943,7 @@ def test_result_w_retry_wo_state(self): custom_predicate = mock.Mock() custom_predicate.return_value = True custom_retry = google.api_core.retry.Retry( - predicate=custom_predicate, initial=0.001, maximum=0.001, deadline=0.001, + predicate=custom_predicate, initial=0.001, maximum=0.001, deadline=0.1, ) self.assertIs(job.result(retry=custom_retry), job) diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index a16572f23..9cc539a90 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -1052,7 +1052,7 @@ def test_result_w_retry(self): initial=0.001, maximum=0.001, multiplier=1.0, - deadline=0.001, + deadline=0.1, predicate=custom_predicate, ) From 739627ae0b211efbae631d9968deae559ada9b25 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 23 Feb 2021 14:21:55 +0100 Subject: [PATCH 6/6] Store the more informative exception in done() --- google/cloud/bigquery/job/query.py | 6 ++++-- tests/unit/job/test_query.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index b95e4e972..5c1118500 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1004,12 +1004,14 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): try: self._reload_query_results(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError: + except exceptions.GoogleAPIError as exc: # Reloading also updates error details on self, thus no need for an # explicit self.set_exception() call if reloading succeeds. try: self.reload(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError as exc: + except exceptions.GoogleAPIError: + # Use the query results reload exception, as it generally contains + # much more useful error information. self.set_exception(exc) return True else: diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 9cc539a90..655a121e6 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -412,7 +412,7 @@ def test_done_w_query_results_error_reload_error(self): is_done = job.done() assert is_done - assert job._exception is reload_error + assert job._exception is bad_request_error def test_done_w_job_query_results_ok_reload_error(self): client = _make_client(project=self.PROJECT)