Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ def _set_future_result(self):
# set, do not call set_result/set_exception again.
# Note: self._result_set is set to True in set_result and
# set_exception, in case those methods are invoked directly.
if self.state != _DONE_STATE or self._result_set:
if not self.done(reload=False) or self._result_set:
return

if self.error_result is not None:
Expand All @@ -776,21 +776,24 @@ def _set_future_result(self):
else:
self.set_result(self)

def done(self, retry=DEFAULT_RETRY, timeout=None):
"""Refresh the job and checks if it is complete.
def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
"""Checks if the job is complete.

Args:
retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
If ``True``, make an API call to refresh the job state of
unfinished jobs before checking. Default ``True``.

Returns:
bool: True if the job is complete, False otherwise.
"""
# Do not refresh is the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
if self.state != _DONE_STATE and reload:
self.reload(retry=retry, timeout=timeout)
return self.state == _DONE_STATE

Expand Down Expand Up @@ -3073,7 +3076,7 @@ def estimated_bytes_processed(self):
result = int(result)
return result

def done(self, retry=DEFAULT_RETRY, timeout=None):
def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
"""Refresh the job and checks if it is complete.

Args:
Expand All @@ -3082,10 +3085,25 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
If ``True``, make an API call to refresh the job state of
unfinished jobs before checking. Default ``True``.

Returns:
bool: True if the job is complete, False otherwise.
"""
is_done = (
# Only consider a QueryJob complete when we know we have the final
# query results available.
self._query_results is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking: What about DDL statements, where we have no results? Or would this already retain the empty row iterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DDL statements don't 404 for jobs.getQueryResults, do they? This is the full response object of jobs.getQueryResults, not just the rows.

and self._query_results.complete
and self.state == _DONE_STATE
)
# Do not refresh if the state is already done, as the job will not
# change once complete.
if not reload or is_done:
return is_done

# Since the API to getQueryResults can hang up to the timeout value
# (default of 10 seconds), set the timeout parameter to ensure that
# the timeout from the futures API is respected. See:
Expand All @@ -3103,23 +3121,20 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

# Do not refresh if the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
)
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
)

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload(retry=retry, timeout=transport_timeout)
# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete and self.state != _DONE_STATE:
self.reload(retry=retry, timeout=transport_timeout)

return self.state == _DONE_STATE

Expand Down Expand Up @@ -3231,16 +3246,6 @@ def result(
"""
try:
super(QueryJob, self).result(retry=retry, timeout=timeout)

# Return an iterator instead of returning the job.
if not self._query_results:
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
location=self.location,
timeout=timeout,
)
except exceptions.GoogleCloudError as exc:
exc.message += self._format_for_exception(self.query, self.job_id)
exc.query_job = self
Expand Down
101 changes: 90 additions & 11 deletions tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
except (ImportError, AttributeError): # pragma: NO COVER
tqdm = None

import google.cloud.bigquery.query


def _make_credentials():
import google.auth.credentials
Expand Down Expand Up @@ -3942,10 +3944,6 @@ def _make_resource(self, started=False, ended=False):
resource = super(TestQueryJob, self)._make_resource(started, ended)
config = resource["configuration"]["query"]
config["query"] = self.QUERY

if ended:
resource["status"] = {"state": "DONE"}

return resource

def _verifyBooleanResourceProperties(self, job, config):
Expand Down Expand Up @@ -4211,6 +4209,9 @@ def test_done(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
{"jobComplete": True, "jobReference": resource["jobReference"]}
)
self.assertTrue(job.done())

def test_done_w_timeout(self):
Expand Down Expand Up @@ -4668,35 +4669,110 @@ def test_result(self):
from google.cloud.bigquery.table import RowIterator

query_resource = {
"jobComplete": False,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
}
query_resource_done = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
}
job_resource = self._make_resource(started=True)
job_resource_done = self._make_resource(started=True, ended=True)
job_resource_done["configuration"]["query"]["destinationTable"] = {
"projectId": "dest-project",
"datasetId": "dest_dataset",
"tableId": "dest_table",
}
tabledata_resource = {
# Explicitly set totalRows to be different from the query response.
# to test update during iteration.
# Explicitly set totalRows to be different from the initial
# response to test update during iteration.
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "abc"}]}],
}
connection = _make_connection(query_resource, tabledata_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
conn = _make_connection(
query_resource, query_resource_done, job_resource_done, tabledata_resource
)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)

result = job.result()

self.assertIsInstance(result, RowIterator)
self.assertEqual(result.total_rows, 2)

rows = list(result)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
# Test that the total_rows property has changed during iteration, based
# on the response from tabledata.list.
self.assertEqual(result.total_rows, 1)

query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"maxResults": 0},
timeout=None,
)
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={},
timeout=None,
)
tabledata_call = mock.call(
method="GET",
path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data",
query_params={},
timeout=None,
)
conn.api_request.assert_has_calls(
[query_results_call, query_results_call, reload_call, tabledata_call]
)

def test_result_with_done_job_calls_get_query_results(self):
query_resource_done = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "1",
}
job_resource = self._make_resource(started=True, ended=True)
job_resource["configuration"]["query"]["destinationTable"] = {
"projectId": "dest-project",
"datasetId": "dest_dataset",
"tableId": "dest_table",
}
tabledata_resource = {
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(query_resource_done, tabledata_resource)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)

result = job.result()

rows = list(result)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")

query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"maxResults": 0},
timeout=None,
)
tabledata_call = mock.call(
method="GET",
path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data",
query_params={},
timeout=None,
)
conn.api_request.assert_has_calls([query_results_call, tabledata_call])

def test_result_with_max_results(self):
from google.cloud.bigquery.table import RowIterator

Expand Down Expand Up @@ -4938,6 +5014,9 @@ def test_result_error(self):
"errors": [error_result],
"state": "DONE",
}
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
{"jobComplete": True, "jobReference": job._properties["jobReference"]}
)
job._set_future_result()

with self.assertRaises(exceptions.GoogleCloudError) as exc_info:
Expand Down
Loading