From 10bdfe82028d513e3fcc6b6719733cbc0ae8f5f0 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 18 Dec 2019 16:50:31 +0000 Subject: [PATCH 1/7] Reverse argument order in job.result() This is for internal consistency with other methods such as reload(). --- bigquery/google/cloud/bigquery/job.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index d20e5b5fb11f..6f444b58cce0 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -766,16 +766,15 @@ def done(self, retry=DEFAULT_RETRY): self.reload(retry=retry) return self.state == _DONE_STATE - def result(self, timeout=None, retry=DEFAULT_RETRY): + def result(self, retry=DEFAULT_RETRY, timeout=None): """Start the job and wait for it to complete and get the result. Args: + retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. timeout (float): How long (in seconds) to wait for job to complete before raising a :class:`concurrent.futures.TimeoutError`. - retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. - Returns: _AsyncJob: This instance. @@ -3125,19 +3124,21 @@ def _begin(self, client=None, retry=DEFAULT_RETRY): raise def result( - self, timeout=None, page_size=None, retry=DEFAULT_RETRY, max_results=None + self, page_size=None, max_results=None, retry=DEFAULT_RETRY, timeout=None ): """Start the job and wait for it to complete and get the result. Args: - timeout (float): - How long (in seconds) to wait for job to complete before - raising a :class:`concurrent.futures.TimeoutError`. - page_size (int): - (Optional) The maximum number of rows in each page of results - from this request. Non-positive values are ignored. - retry (google.api_core.retry.Retry): - (Optional) How to retry the call that retrieves rows. + page_size (Optional[int]): + The maximum number of rows in each page of results from this + request. Non-positive values are ignored. + max_results (Optional[int]): + The maximum total number of rows from this request. + retry (Optional[google.api_core.retry.Retry]): + How to retry the call that retrieves rows. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.table.RowIterator: From c50b897fdb1a8d7ce81ea44d9492cf0f8c69746a Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 19 Dec 2019 15:23:09 +0000 Subject: [PATCH 2/7] Add TODO reminder to _AsyncJob.cancel() method --- bigquery/google/cloud/bigquery/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 6f444b58cce0..9449a8859411 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -719,6 +719,7 @@ def cancel(self, client=None): if self.location: extra_params["location"] = self.location + # TODO: call thorugh client._call_api() and allow passing in a retry? api_response = client._connection.api_request( method="POST", path="%s/cancel" % (self.path,), query_params=extra_params ) From b7823747d8333c10aac558c04490ae99eaa879d1 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 19 Dec 2019 15:53:09 +0000 Subject: [PATCH 3/7] Add timeout argument to public methods AN exception is the Client.load_table_from_file() method, and the methods that depend on it, because adding a timeout requires changes in the google-resumable-media dependency. --- bigquery/google/cloud/bigquery/client.py | 301 ++++++++++++++++++----- bigquery/google/cloud/bigquery/job.py | 58 +++-- bigquery/tests/unit/test_client.py | 243 +++++++++++++----- bigquery/tests/unit/test_job.py | 58 +++-- 4 files changed, 511 insertions(+), 149 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 5fd7bceea973..b3f8d3e73f42 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -206,7 +206,7 @@ def close(self): self._http._auth_request.session.close() self._http.close() - def get_service_account_email(self, project=None): + def get_service_account_email(self, project=None, timeout=None): """Get the email address of the project's BigQuery service account Note: @@ -217,6 +217,8 @@ def get_service_account_email(self, project=None): project (str, optional): Project ID to use for retreiving service account email. Defaults to the client's project. + timeout (Optional[float]): + The number of seconds to wait for the API response. Returns: str: service account email address @@ -232,10 +234,16 @@ def get_service_account_email(self, project=None): if project is None: project = self.project path = "/projects/%s/serviceAccount" % (project,) - api_response = self._connection.api_request(method="GET", path=path) + + # TODO: call thorugh self._call_api() and allow passing in a retry? + api_response = self._connection.api_request( + method="GET", path=path, timeout=timeout + ) return api_response["email"] - def list_projects(self, max_results=None, page_token=None, retry=DEFAULT_RETRY): + def list_projects( + self, max_results=None, page_token=None, retry=DEFAULT_RETRY, timeout=None + ): """List projects for the project associated with this client. See @@ -256,6 +264,10 @@ def list_projects(self, max_results=None, page_token=None, retry=DEFAULT_RETRY): retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + Returns: google.api_core.page_iterator.Iterator: Iterator of :class:`~google.cloud.bigquery.client.Project` @@ -263,7 +275,7 @@ def list_projects(self, max_results=None, page_token=None, retry=DEFAULT_RETRY): """ return page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path="/projects", item_to_value=_item_to_project, items_key="projects", @@ -279,6 +291,7 @@ def list_datasets( max_results=None, page_token=None, retry=DEFAULT_RETRY, + timeout=None, ): """List datasets for the project associated with this client. @@ -307,6 +320,9 @@ def list_datasets( :class:`~google.api_core.page_iterator.HTTPIterator`. retry (google.api_core.retry.Retry): Optional. How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.api_core.page_iterator.Iterator: @@ -325,7 +341,7 @@ def list_datasets( path = "/projects/%s/datasets" % (project,) return page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_dataset, items_key="datasets", @@ -366,7 +382,9 @@ def _create_bqstorage_client(self): credentials=self._credentials ) - def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY): + def create_dataset( + self, dataset, exists_ok=False, retry=DEFAULT_RETRY, timeout=None + ): """API call: create the dataset via a POST request. See @@ -386,6 +404,9 @@ def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY): errors when creating the dataset. retry (google.api_core.retry.Retry): Optional. How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.dataset.Dataset: @@ -413,14 +434,18 @@ def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY): data["location"] = self.location try: - api_response = self._call_api(retry, method="POST", path=path, data=data) + api_response = self._call_api( + retry, method="POST", path=path, data=data, timeout=timeout + ) return Dataset.from_api_repr(api_response) except google.api_core.exceptions.Conflict: if not exists_ok: raise return self.get_dataset(dataset.reference, retry=retry) - def create_routine(self, routine, exists_ok=False, retry=DEFAULT_RETRY): + def create_routine( + self, routine, exists_ok=False, retry=DEFAULT_RETRY, timeout=None + ): """[Beta] Create a routine via a POST request. See @@ -435,6 +460,9 @@ def create_routine(self, routine, exists_ok=False, retry=DEFAULT_RETRY): errors when creating the routine. retry (google.api_core.retry.Retry): Optional. How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.routine.Routine: @@ -447,7 +475,7 @@ def create_routine(self, routine, exists_ok=False, retry=DEFAULT_RETRY): resource = routine.to_api_repr() try: api_response = self._call_api( - retry, method="POST", path=path, data=resource + retry, method="POST", path=path, data=resource, timeout=timeout ) return Routine.from_api_repr(api_response) except google.api_core.exceptions.Conflict: @@ -455,7 +483,7 @@ def create_routine(self, routine, exists_ok=False, retry=DEFAULT_RETRY): raise return self.get_routine(routine.reference, retry=retry) - def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY): + def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY, timeout=None): """API call: create a table via a PUT request See @@ -476,6 +504,9 @@ def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY): errors when creating the table. retry (google.api_core.retry.Retry): Optional. How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.table.Table: @@ -486,7 +517,9 @@ def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY): path = "/projects/%s/datasets/%s/tables" % (table.project, table.dataset_id) data = table.to_api_repr() try: - api_response = self._call_api(retry, method="POST", path=path, data=data) + api_response = self._call_api( + retry, method="POST", path=path, data=data, timeout=timeout + ) return Table.from_api_repr(api_response) except google.api_core.exceptions.Conflict: if not exists_ok: @@ -499,7 +532,7 @@ def _call_api(self, retry, **kwargs): call = retry(call) return call() - def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY): + def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY, timeout=None): """Fetch the dataset referenced by ``dataset_ref`` Args: @@ -513,6 +546,9 @@ def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY): :func:`~google.cloud.bigquery.dataset.DatasetReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.dataset.Dataset: @@ -523,10 +559,12 @@ def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY): dataset_ref, default_project=self.project ) - api_response = self._call_api(retry, method="GET", path=dataset_ref.path) + api_response = self._call_api( + retry, method="GET", path=dataset_ref.path, timeout=timeout + ) return Dataset.from_api_repr(api_response) - def get_model(self, model_ref, retry=DEFAULT_RETRY): + def get_model(self, model_ref, retry=DEFAULT_RETRY, timeout=None): """[Beta] Fetch the model referenced by ``model_ref``. Args: @@ -540,6 +578,9 @@ def get_model(self, model_ref, retry=DEFAULT_RETRY): :func:`google.cloud.bigquery.model.ModelReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.model.Model: A ``Model`` instance. @@ -549,10 +590,12 @@ def get_model(self, model_ref, retry=DEFAULT_RETRY): model_ref, default_project=self.project ) - api_response = self._call_api(retry, method="GET", path=model_ref.path) + api_response = self._call_api( + retry, method="GET", path=model_ref.path, timeout=timeout + ) return Model.from_api_repr(api_response) - def get_routine(self, routine_ref, retry=DEFAULT_RETRY): + def get_routine(self, routine_ref, retry=DEFAULT_RETRY, timeout=None): """[Beta] Get the routine referenced by ``routine_ref``. Args: @@ -567,6 +610,9 @@ def get_routine(self, routine_ref, retry=DEFAULT_RETRY): :func:`google.cloud.bigquery.routine.RoutineReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the API call. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.routine.Routine: @@ -577,10 +623,12 @@ def get_routine(self, routine_ref, retry=DEFAULT_RETRY): routine_ref, default_project=self.project ) - api_response = self._call_api(retry, method="GET", path=routine_ref.path) + api_response = self._call_api( + retry, method="GET", path=routine_ref.path, timeout=timeout + ) return Routine.from_api_repr(api_response) - def get_table(self, table, retry=DEFAULT_RETRY): + def get_table(self, table, retry=DEFAULT_RETRY, timeout=None): """Fetch the table referenced by ``table``. Args: @@ -595,16 +643,21 @@ def get_table(self, table, retry=DEFAULT_RETRY): :func:`google.cloud.bigquery.table.TableReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.table.Table: A ``Table`` instance. """ table_ref = _table_arg_to_table_ref(table, default_project=self.project) - api_response = self._call_api(retry, method="GET", path=table_ref.path) + api_response = self._call_api( + retry, method="GET", path=table_ref.path, timeout=timeout + ) return Table.from_api_repr(api_response) - def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY): + def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY, timeout=None): """Change some fields of a dataset. Use ``fields`` to specify which fields to update. At least one field @@ -625,6 +678,9 @@ def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY): The properties of ``dataset`` to change (e.g. "friendly_name"). retry (google.api_core.retry.Retry, optional): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.dataset.Dataset: @@ -636,11 +692,16 @@ def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY): else: headers = None api_response = self._call_api( - retry, method="PATCH", path=dataset.path, data=partial, headers=headers + retry, + method="PATCH", + path=dataset.path, + data=partial, + headers=headers, + timeout=timeout, ) return Dataset.from_api_repr(api_response) - def update_model(self, model, fields, retry=DEFAULT_RETRY): + def update_model(self, model, fields, retry=DEFAULT_RETRY, timeout=None): """[Beta] Change some fields of a model. Use ``fields`` to specify which fields to update. At least one field @@ -660,6 +721,9 @@ def update_model(self, model, fields, retry=DEFAULT_RETRY): properties (e.g. "friendly_name"). retry (google.api_core.retry.Retry): (Optional) A description of how to retry the API call. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.model.Model: @@ -671,11 +735,16 @@ def update_model(self, model, fields, retry=DEFAULT_RETRY): else: headers = None api_response = self._call_api( - retry, method="PATCH", path=model.path, data=partial, headers=headers + retry, + method="PATCH", + path=model.path, + data=partial, + headers=headers, + timeout=timeout, ) return Model.from_api_repr(api_response) - def update_routine(self, routine, fields, retry=DEFAULT_RETRY): + def update_routine(self, routine, fields, retry=DEFAULT_RETRY, timeout=None): """[Beta] Change some fields of a routine. Use ``fields`` to specify which fields to update. At least one field @@ -702,6 +771,9 @@ def update_routine(self, routine, fields, retry=DEFAULT_RETRY): (e.g. ``type_``). retry (google.api_core.retry.Retry): (Optional) A description of how to retry the API call. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.routine.Routine: @@ -717,11 +789,16 @@ def update_routine(self, routine, fields, retry=DEFAULT_RETRY): partial["routineReference"] = routine.reference.to_api_repr() api_response = self._call_api( - retry, method="PUT", path=routine.path, data=partial, headers=headers + retry, + method="PUT", + path=routine.path, + data=partial, + headers=headers, + timeout=timeout, ) return Routine.from_api_repr(api_response) - def update_table(self, table, fields, retry=DEFAULT_RETRY): + def update_table(self, table, fields, retry=DEFAULT_RETRY, timeout=None): """Change some fields of a table. Use ``fields`` to specify which fields to update. At least one field @@ -741,6 +818,9 @@ def update_table(self, table, fields, retry=DEFAULT_RETRY): properties (e.g. "friendly_name"). retry (google.api_core.retry.Retry): (Optional) A description of how to retry the API call. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.table.Table: @@ -752,12 +832,22 @@ def update_table(self, table, fields, retry=DEFAULT_RETRY): else: headers = None api_response = self._call_api( - retry, method="PATCH", path=table.path, data=partial, headers=headers + retry, + method="PATCH", + path=table.path, + data=partial, + headers=headers, + timeout=timeout, ) return Table.from_api_repr(api_response) def list_models( - self, dataset, max_results=None, page_token=None, retry=DEFAULT_RETRY + self, + dataset, + max_results=None, + page_token=None, + retry=DEFAULT_RETRY, + timeout=None, ): """[Beta] List models in the dataset. @@ -786,6 +876,9 @@ def list_models( :class:`~google.api_core.page_iterator.HTTPIterator`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.api_core.page_iterator.Iterator: @@ -804,7 +897,7 @@ def list_models( path = "%s/models" % dataset.path result = page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_model, items_key="models", @@ -815,7 +908,12 @@ def list_models( return result def list_routines( - self, dataset, max_results=None, page_token=None, retry=DEFAULT_RETRY + self, + dataset, + max_results=None, + page_token=None, + retry=DEFAULT_RETRY, + timeout=None, ): """[Beta] List routines in the dataset. @@ -844,6 +942,9 @@ def list_routines( :class:`~google.api_core.page_iterator.HTTPIterator`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.api_core.page_iterator.Iterator: @@ -862,7 +963,7 @@ def list_routines( path = "{}/routines".format(dataset.path) result = page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_routine, items_key="routines", @@ -873,7 +974,12 @@ def list_routines( return result def list_tables( - self, dataset, max_results=None, page_token=None, retry=DEFAULT_RETRY + self, + dataset, + max_results=None, + page_token=None, + retry=DEFAULT_RETRY, + timeout=None, ): """List tables in the dataset. @@ -902,6 +1008,9 @@ def list_tables( :class:`~google.api_core.page_iterator.HTTPIterator`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.api_core.page_iterator.Iterator: @@ -920,7 +1029,7 @@ def list_tables( path = "%s/tables" % dataset.path result = page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_table, items_key="tables", @@ -931,7 +1040,12 @@ def list_tables( return result def delete_dataset( - self, dataset, delete_contents=False, retry=DEFAULT_RETRY, not_found_ok=False + self, + dataset, + delete_contents=False, + retry=DEFAULT_RETRY, + timeout=None, + not_found_ok=False, ): """Delete a dataset. @@ -954,6 +1068,9 @@ def delete_dataset( Default is False. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. not_found_ok (bool): Defaults to ``False``. If ``True``, ignore "not found" errors when deleting the dataset. @@ -972,13 +1089,19 @@ def delete_dataset( try: self._call_api( - retry, method="DELETE", path=dataset.path, query_params=params + retry, + method="DELETE", + path=dataset.path, + query_params=params, + timeout=timeout, ) except google.api_core.exceptions.NotFound: if not not_found_ok: raise - def delete_model(self, model, retry=DEFAULT_RETRY, not_found_ok=False): + def delete_model( + self, model, retry=DEFAULT_RETRY, timeout=None, not_found_ok=False + ): """[Beta] Delete a model See @@ -996,6 +1119,9 @@ def delete_model(self, model, retry=DEFAULT_RETRY, not_found_ok=False): :func:`google.cloud.bigquery.model.ModelReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. not_found_ok (bool): Defaults to ``False``. If ``True``, ignore "not found" errors when deleting the model. @@ -1007,12 +1133,14 @@ def delete_model(self, model, retry=DEFAULT_RETRY, not_found_ok=False): raise TypeError("model must be a Model or a ModelReference") try: - self._call_api(retry, method="DELETE", path=model.path) + self._call_api(retry, method="DELETE", path=model.path, timeout=timeout) except google.api_core.exceptions.NotFound: if not not_found_ok: raise - def delete_routine(self, routine, retry=DEFAULT_RETRY, not_found_ok=False): + def delete_routine( + self, routine, retry=DEFAULT_RETRY, timeout=None, not_found_ok=False + ): """[Beta] Delete a routine. See @@ -1030,6 +1158,9 @@ def delete_routine(self, routine, retry=DEFAULT_RETRY, not_found_ok=False): :func:`google.cloud.bigquery.routine.RoutineReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. not_found_ok (bool): Defaults to ``False``. If ``True``, ignore "not found" errors when deleting the routine. @@ -1043,12 +1174,14 @@ def delete_routine(self, routine, retry=DEFAULT_RETRY, not_found_ok=False): raise TypeError("routine must be a Routine or a RoutineReference") try: - self._call_api(retry, method="DELETE", path=routine.path) + self._call_api(retry, method="DELETE", path=routine.path, timeout=timeout) except google.api_core.exceptions.NotFound: if not not_found_ok: raise - def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False): + def delete_table( + self, table, retry=DEFAULT_RETRY, timeout=None, not_found_ok=False + ): """Delete a table See @@ -1066,6 +1199,9 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False): :func:`google.cloud.bigquery.table.TableReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. not_found_ok (bool): Defaults to ``False``. If ``True``, ignore "not found" errors when deleting the table. @@ -1075,7 +1211,7 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False): raise TypeError("Unable to get TableReference for table '{}'".format(table)) try: - self._call_api(retry, method="DELETE", path=table.path) + self._call_api(retry, method="DELETE", path=table.path, timeout=timeout) except google.api_core.exceptions.NotFound: if not not_found_ok: raise @@ -1098,7 +1234,7 @@ def _get_query_results( location (str): Location of the query job. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport - before retrying the HTTP request. + before using ``retry``. Returns: google.cloud.bigquery.query._QueryResults: @@ -1155,7 +1291,9 @@ def job_from_resource(self, resource): return job.QueryJob.from_api_repr(resource, self) return job.UnknownJob.from_api_repr(resource, self) - def get_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): + def get_job( + self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None + ): """Fetch a job for the project associated with this client. See @@ -1171,6 +1309,9 @@ def get_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): location (str): Location where the job was run. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: Union[ \ @@ -1195,12 +1336,14 @@ def get_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): path = "/projects/{}/jobs/{}".format(project, job_id) resource = self._call_api( - retry, method="GET", path=path, query_params=extra_params + retry, method="GET", path=path, query_params=extra_params, timeout=timeout ) return self.job_from_resource(resource) - def cancel_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): + def cancel_job( + self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None + ): """Attempt to cancel a job from a job ID. See @@ -1216,6 +1359,9 @@ def cancel_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): location (str): Location where the job was run. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: Union[ \ @@ -1240,7 +1386,7 @@ def cancel_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): path = "/projects/{}/jobs/{}/cancel".format(project, job_id) resource = self._call_api( - retry, method="POST", path=path, query_params=extra_params + retry, method="POST", path=path, query_params=extra_params, timeout=timeout ) return self.job_from_resource(resource["job"]) @@ -1254,6 +1400,7 @@ def list_jobs( all_users=None, state_filter=None, retry=DEFAULT_RETRY, + timeout=None, min_creation_time=None, max_creation_time=None, ): @@ -1290,6 +1437,9 @@ def list_jobs( * ``"running"`` retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. min_creation_time (Optional[datetime.datetime]): Min value for job creation time. If set, only jobs created after or at this timestamp are returned. If the datetime has @@ -1329,7 +1479,7 @@ def list_jobs( path = "/projects/%s/jobs" % (project,) return page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_job, items_key="jobs", @@ -1348,6 +1498,7 @@ def load_table_from_uri( project=None, job_config=None, retry=DEFAULT_RETRY, + timeout=None, ): """Starts a job for loading data into a table from CloudStorage. @@ -1384,6 +1535,9 @@ def load_table_from_uri( (Optional) Extra configuration options for the job. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -1413,7 +1567,7 @@ def load_table_from_uri( _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config) - load_job._begin(retry=retry) + load_job._begin(retry=retry, timeout=timeout) return load_job @@ -1918,6 +2072,7 @@ def copy_table( project=None, job_config=None, retry=DEFAULT_RETRY, + timeout=None, ): """Copy one or more tables to another table. @@ -1961,6 +2116,9 @@ def copy_table( (Optional) Extra configuration options for the job. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.CopyJob: A new copy job instance. @@ -2004,7 +2162,7 @@ def copy_table( copy_job = job.CopyJob( job_ref, sources, destination, client=self, job_config=job_config ) - copy_job._begin(retry=retry) + copy_job._begin(retry=retry, timeout=timeout) return copy_job @@ -2018,6 +2176,7 @@ def extract_table( project=None, job_config=None, retry=DEFAULT_RETRY, + timeout=None, ): """Start a job to extract a table into Cloud Storage files. @@ -2052,6 +2211,9 @@ def extract_table( (Optional) Extra configuration options for the job. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Args: source (google.cloud.bigquery.table.TableReference): table to be extracted. @@ -2086,7 +2248,7 @@ def extract_table( extract_job = job.ExtractJob( job_ref, source, destination_uris, client=self, job_config=job_config ) - extract_job._begin(retry=retry) + extract_job._begin(retry=retry, timeout=timeout) return extract_job @@ -2099,6 +2261,7 @@ def query( location=None, project=None, retry=DEFAULT_RETRY, + timeout=None, ): """Run a SQL query. @@ -2129,6 +2292,9 @@ def query( to the client's project. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.QueryJob: A new query job instance. @@ -2169,7 +2335,7 @@ def query( job_ref = job._JobReference(job_id, project=project, location=location) query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config) - query_job._begin(retry=retry) + query_job._begin(retry=retry, timeout=timeout) return query_job @@ -2290,6 +2456,7 @@ def insert_rows_json( ignore_unknown_values=None, template_suffix=None, retry=DEFAULT_RETRY, + timeout=None, ): """Insert rows into a table without applying local type conversions. @@ -2326,6 +2493,9 @@ def insert_rows_json( https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: Sequence[Mappings]: @@ -2359,7 +2529,11 @@ def insert_rows_json( # We can always retry, because every row has an insert ID. response = self._call_api( - retry, method="POST", path="%s/insertAll" % table.path, data=data + retry, + method="POST", + path="%s/insertAll" % table.path, + data=data, + timeout=timeout, ) errors = [] @@ -2368,7 +2542,7 @@ def insert_rows_json( return errors - def list_partitions(self, table, retry=DEFAULT_RETRY): + def list_partitions(self, table, retry=DEFAULT_RETRY, timeout=None): """List the partitions in a table. Args: @@ -2380,23 +2554,31 @@ def list_partitions(self, table, retry=DEFAULT_RETRY): The table or reference from which to get partition info retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: List[str]: A list of the partition ids present in the partitioned table """ + # TODO: split timeout between all API calls in the method table = _table_arg_to_table_ref(table, default_project=self.project) meta_table = self.get_table( TableReference( self.dataset(table.dataset_id, project=table.project), "%s$__PARTITIONS_SUMMARY__" % table.table_id, - ) + ), + retry=retry, + timeout=timeout, ) subset = [col for col in meta_table.schema if col.name == "partition_id"] return [ row[0] - for row in self.list_rows(meta_table, selected_fields=subset, retry=retry) + for row in self.list_rows( + meta_table, selected_fields=subset, retry=retry, timeout=timeout + ) ] def list_rows( @@ -2408,6 +2590,7 @@ def list_rows( start_index=None, page_size=None, retry=DEFAULT_RETRY, + timeout=None, ): """List the rows of the table. @@ -2452,6 +2635,9 @@ def list_rows( to a sensible value set by the API. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.table.RowIterator: @@ -2462,6 +2648,7 @@ def list_rows( (this is distinct from the total number of rows in the current page: ``iterator.page.num_items``). """ + # TODO: split timeout between all internal API calls table = _table_arg_to_table(table, default_project=self.project) if not isinstance(table, Table): @@ -2476,7 +2663,7 @@ def list_rows( # No schema, but no selected_fields. Assume the developer wants all # columns, so get the table resource for them rather than failing. elif len(schema) == 0: - table = self.get_table(table.reference, retry=retry) + table = self.get_table(table.reference, retry=retry, timeout=timeout) schema = table.schema params = {} @@ -2487,7 +2674,7 @@ def list_rows( row_iterator = RowIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path="%s/data" % (table.path,), schema=schema, page_token=page_token, diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 9449a8859411..7a5fe6decafb 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -607,7 +607,7 @@ def to_api_repr(self): _build_resource = to_api_repr # backward-compatibility alias - def _begin(self, client=None, retry=DEFAULT_RETRY): + def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): """API call: begin the job via a POST request See @@ -619,6 +619,9 @@ def _begin(self, client=None, retry=DEFAULT_RETRY): associated with the job object or``NoneType`` retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Raises: ValueError: @@ -633,11 +636,11 @@ def _begin(self, client=None, retry=DEFAULT_RETRY): # jobs.insert is idempotent because we ensure that every new # job has an ID. api_response = client._call_api( - retry, method="POST", path=path, data=self.to_api_repr() + retry, method="POST", path=path, data=self.to_api_repr(), timeout=timeout ) self._set_properties(api_response) - def exists(self, client=None, retry=DEFAULT_RETRY): + def exists(self, client=None, retry=DEFAULT_RETRY, timeout=None): """API call: test for the existence of the job via a GET request See @@ -649,6 +652,9 @@ def exists(self, client=None, retry=DEFAULT_RETRY): ``client`` stored on the current dataset. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: bool: Boolean indicating existence of the job. @@ -661,7 +667,11 @@ def exists(self, client=None, retry=DEFAULT_RETRY): try: client._call_api( - retry, method="GET", path=self.path, query_params=extra_params + retry, + method="GET", + path=self.path, + query_params=extra_params, + timeout=timeout, ) except NotFound: return False @@ -682,7 +692,7 @@ def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None): retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport - before retrying the HTTP request. + before using ``retry``. """ client = self._require_client(client) @@ -699,7 +709,7 @@ def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None): ) self._set_properties(api_response) - def cancel(self, client=None): + def cancel(self, client=None, timeout=None): """API call: cancel job via a POST request See @@ -709,6 +719,8 @@ def cancel(self, client=None): client (Optional[google.cloud.bigquery.client.Client]): the client to use. If not passed, falls back to the ``client`` stored on the current dataset. + timeout (Optional[float]): + The number of seconds to wait for the API response. Returns: bool: Boolean indicating that the cancel request was sent. @@ -721,7 +733,10 @@ def cancel(self, client=None): # TODO: call thorugh client._call_api() and allow passing in a retry? api_response = client._connection.api_request( - method="POST", path="%s/cancel" % (self.path,), query_params=extra_params + method="POST", + path="%s/cancel" % (self.path,), + query_params=extra_params, + timeout=timeout, ) self._set_properties(api_response["job"]) # The Future interface requires that we return True if the *attempt* @@ -752,11 +767,14 @@ def _set_future_result(self): else: self.set_result(self) - def done(self, retry=DEFAULT_RETRY): + def done(self, retry=DEFAULT_RETRY, timeout=None): """Refresh the job and checks if it is complete. Args: retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: bool: True if the job is complete, False otherwise. @@ -764,7 +782,7 @@ def done(self, retry=DEFAULT_RETRY): # Do not refresh is the state is already done, as the job will not # change once complete. if self.state != _DONE_STATE: - self.reload(retry=retry) + self.reload(retry=retry, timeout=timeout) return self.state == _DONE_STATE def result(self, retry=DEFAULT_RETRY, timeout=None): @@ -772,9 +790,9 @@ def result(self, retry=DEFAULT_RETRY, timeout=None): Args: retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. - timeout (float): - How long (in seconds) to wait for job to complete before raising - a :class:`concurrent.futures.TimeoutError`. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: _AsyncJob: This instance. @@ -785,8 +803,10 @@ def result(self, retry=DEFAULT_RETRY, timeout=None): concurrent.futures.TimeoutError: if the job did not complete in the given timeout. """ + # TODO: combine _begin timeout with super().result() timeout! + # borrow timeout guard from google auth lib if self.state is None: - self._begin(retry=retry) + self._begin(retry=retry, timeout=timeout) # TODO: modify PollingFuture so it can pass a retry argument to done(). return super(_AsyncJob, self).result(timeout=timeout) @@ -3014,7 +3034,7 @@ def done(self, retry=DEFAULT_RETRY, timeout=None): How to retry the call that retrieves query results. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport - before retrying the HTTP request. + before using ``retry``. Returns: bool: True if the job is complete, False otherwise. @@ -3100,7 +3120,7 @@ def _format_for_exception(query, job_id): return template.format(job_id=job_id, header=header, ruler=ruler, body=body) - def _begin(self, client=None, retry=DEFAULT_RETRY): + def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): """API call: begin the job via a POST request See @@ -3112,13 +3132,16 @@ def _begin(self, client=None, retry=DEFAULT_RETRY): associated with the job object or``NoneType``. retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Raises: ValueError: If the job has already begun. """ try: - super(QueryJob, self)._begin(client=client, retry=retry) + super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout) except exceptions.GoogleCloudError as exc: exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self @@ -3157,6 +3180,9 @@ def result( If the job did not complete in the given timeout. """ try: + # TODO: combine timeout with timeouts passed to super().result() + # and _get_query_results (total timeout shared by both) + # borrow timeout guard from google auth lib super(QueryJob, self).result(timeout=timeout) # Return an iterator instead of returning the job. diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index c9166bd5d7c0..2a5b85e13ee0 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -290,7 +290,7 @@ def test_get_service_account_email(self): service_account_email = client.get_service_account_email() - conn.api_request.assert_called_once_with(method="GET", path=path) + conn.api_request.assert_called_once_with(method="GET", path=path, timeout=None) self.assertEqual(service_account_email, email) def test_get_service_account_email_w_alternate_project(self): @@ -305,7 +305,7 @@ def test_get_service_account_email_w_alternate_project(self): service_account_email = client.get_service_account_email(project=project) - conn.api_request.assert_called_once_with(method="GET", path=path) + conn.api_request.assert_called_once_with(method="GET", path=path, timeout=None) self.assertEqual(service_account_email, email) def test_list_projects_defaults(self): @@ -351,7 +351,7 @@ def test_list_projects_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/projects", query_params={} + method="GET", path="/projects", query_params={}, timeout=None ) def test_list_projects_explicit_response_missing_projects_key(self): @@ -373,6 +373,7 @@ def test_list_projects_explicit_response_missing_projects_key(self): method="GET", path="/projects", query_params={"maxResults": 3, "pageToken": TOKEN}, + timeout=None, ) def test_list_datasets_defaults(self): @@ -422,7 +423,7 @@ def test_list_datasets_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=None ) def test_list_datasets_w_project(self): @@ -433,7 +434,10 @@ def test_list_datasets_w_project(self): list(client.list_datasets(project="other-project")) conn.api_request.assert_called_once_with( - method="GET", path="/projects/other-project/datasets", query_params={} + method="GET", + path="/projects/other-project/datasets", + query_params={}, + timeout=None, ) def test_list_datasets_explicit_response_missing_datasets_key(self): @@ -464,6 +468,7 @@ def test_list_datasets_explicit_response_missing_datasets_key(self): "maxResults": 3, "pageToken": TOKEN, }, + timeout=None, ) def test_dataset_with_specified_project(self): @@ -504,7 +509,9 @@ def test_get_dataset(self): dataset = client.get_dataset(dataset_ref) - conn.api_request.assert_called_once_with(method="GET", path="/%s" % path) + conn.api_request.assert_called_once_with( + method="GET", path="/%s" % path, timeout=None + ) self.assertEqual(dataset.dataset_id, self.DS_ID) # Test retry. @@ -596,6 +603,7 @@ def test_create_dataset_minimal(self): }, "labels": {}, }, + timeout=None, ) def test_create_dataset_w_attrs(self): @@ -670,6 +678,7 @@ def test_create_dataset_w_attrs(self): ], "labels": LABELS, }, + timeout=None, ) def test_create_dataset_w_custom_property(self): @@ -707,6 +716,7 @@ def test_create_dataset_w_custom_property(self): "newAlphaProperty": "unreleased property", "labels": {}, }, + timeout=None, ) def test_create_dataset_w_client_location_wo_dataset_location(self): @@ -747,6 +757,7 @@ def test_create_dataset_w_client_location_wo_dataset_location(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ) def test_create_dataset_w_client_location_w_dataset_location(self): @@ -789,6 +800,7 @@ def test_create_dataset_w_client_location_w_dataset_location(self): "labels": {}, "location": OTHER_LOCATION, }, + timeout=None, ) def test_create_dataset_w_reference(self): @@ -824,6 +836,7 @@ def test_create_dataset_w_reference(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ) def test_create_dataset_w_fully_qualified_string(self): @@ -859,6 +872,7 @@ def test_create_dataset_w_fully_qualified_string(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ) def test_create_dataset_w_string(self): @@ -894,6 +908,7 @@ def test_create_dataset_w_string(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ) def test_create_dataset_alreadyexists_w_exists_ok_false(self): @@ -946,8 +961,9 @@ def test_create_dataset_alreadyexists_w_exists_ok_true(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ), - mock.call(method="GET", path=get_path), + mock.call(method="GET", path=get_path, timeout=None), ] ) @@ -974,6 +990,7 @@ def test_create_routine_w_minimal_resource(self): method="POST", path="/projects/test-routine-project/datasets/test_routines/routines", data=resource, + timeout=None, ) self.assertEqual( actual_routine.reference, RoutineReference.from_string(full_routine_id) @@ -1004,6 +1021,7 @@ def test_create_routine_w_conflict(self): method="POST", path="/projects/test-routine-project/datasets/test_routines/routines", data=resource, + timeout=None, ) def test_create_routine_w_conflict_exists_ok(self): @@ -1035,10 +1053,12 @@ def test_create_routine_w_conflict_exists_ok(self): method="POST", path="/projects/test-routine-project/datasets/test_routines/routines", data=resource, + timeout=None, ), mock.call( method="GET", path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", + timeout=None, ), ] ) @@ -1069,6 +1089,7 @@ def test_create_table_w_day_partition(self): "timePartitioning": {"type": "DAY"}, "labels": {}, }, + timeout=None, ) self.assertEqual(table.time_partitioning.type_, "DAY") self.assertEqual(got.table_id, self.TABLE_ID) @@ -1101,6 +1122,7 @@ def test_create_table_w_custom_property(self): "newAlphaProperty": "unreleased property", "labels": {}, }, + timeout=None, ) self.assertEqual(got._properties["newAlphaProperty"], "unreleased property") self.assertEqual(got.table_id, self.TABLE_ID) @@ -1135,6 +1157,7 @@ def test_create_table_w_encryption_configuration(self): "labels": {}, "encryptionConfiguration": {"kmsKeyName": self.KMS_KEY_NAME}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) @@ -1164,6 +1187,7 @@ def test_create_table_w_day_partition_and_expire(self): "timePartitioning": {"type": "DAY", "expirationMs": "100"}, "labels": {}, }, + timeout=None, ) self.assertEqual(table.time_partitioning.type_, "DAY") self.assertEqual(table.time_partitioning.expiration_ms, 100) @@ -1237,6 +1261,7 @@ def test_create_table_w_schema_and_query(self): "view": {"query": query, "useLegacySql": False}, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) self.assertEqual(got.project, self.PROJECT) @@ -1284,6 +1309,7 @@ def test_create_table_w_external(self): }, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) self.assertEqual(got.project, self.PROJECT) @@ -1313,6 +1339,7 @@ def test_create_table_w_reference(self): }, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) @@ -1338,6 +1365,7 @@ def test_create_table_w_fully_qualified_string(self): }, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) @@ -1361,6 +1389,7 @@ def test_create_table_w_string(self): }, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) @@ -1388,6 +1417,7 @@ def test_create_table_alreadyexists_w_exists_ok_false(self): }, "labels": {}, }, + timeout=None, ) def test_create_table_alreadyexists_w_exists_ok_true(self): @@ -1425,8 +1455,9 @@ def test_create_table_alreadyexists_w_exists_ok_true(self): }, "labels": {}, }, + timeout=None, ), - mock.call(method="GET", path=get_path), + mock.call(method="GET", path=get_path, timeout=None), ] ) @@ -1462,7 +1493,9 @@ def test_get_model(self): model_ref = client.dataset(self.DS_ID).model(self.MODEL_ID) got = client.get_model(model_ref) - conn.api_request.assert_called_once_with(method="GET", path="/%s" % path) + conn.api_request.assert_called_once_with( + method="GET", path="/%s" % path, timeout=None + ) self.assertEqual(got.model_id, self.MODEL_ID) def test_get_model_w_string(self): @@ -1486,7 +1519,9 @@ def test_get_model_w_string(self): model_id = "{}.{}.{}".format(self.PROJECT, self.DS_ID, self.MODEL_ID) got = client.get_model(model_id) - conn.api_request.assert_called_once_with(method="GET", path="/%s" % path) + conn.api_request.assert_called_once_with( + method="GET", path="/%s" % path, timeout=None + ) self.assertEqual(got.model_id, self.MODEL_ID) def test_get_routine(self): @@ -1518,6 +1553,7 @@ def test_get_routine(self): conn.api_request.assert_called_once_with( method="GET", path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", + timeout=None, ) self.assertEqual( actual_routine.reference, @@ -1548,7 +1584,9 @@ def test_get_table(self): conn = client._connection = make_connection(resource) table = client.get_table(self.TABLE_REF) - conn.api_request.assert_called_once_with(method="GET", path="/%s" % path) + conn.api_request.assert_called_once_with( + method="GET", path="/%s" % path, timeout=None + ) self.assertEqual(table.table_id, self.TABLE_ID) def test_get_table_sets_user_agent(self): @@ -1636,6 +1674,7 @@ def test_update_dataset(self): }, path="/" + PATH, headers=None, + timeout=None, ) self.assertEqual(ds2.description, ds.description) self.assertEqual(ds2.friendly_name, ds.friendly_name) @@ -1671,6 +1710,7 @@ def test_update_dataset_w_custom_property(self): data={"newAlphaProperty": "unreleased property"}, path=path, headers=None, + timeout=None, ) self.assertEqual(dataset.dataset_id, self.DS_ID) @@ -1723,7 +1763,7 @@ def test_update_model(self): "labels": {"x": "y"}, } conn.api_request.assert_called_once_with( - method="PATCH", data=sent, path="/" + path, headers=None + method="PATCH", data=sent, path="/" + path, headers=None, timeout=None ) self.assertEqual(updated_model.model_id, model.model_id) self.assertEqual(updated_model.description, model.description) @@ -1785,6 +1825,7 @@ def test_update_routine(self): data=sent, path="/projects/routines-project/datasets/test_routines/routines/updated_routine", headers=None, + timeout=None, ) self.assertEqual(actual_routine.arguments, routine.arguments) self.assertEqual(actual_routine.body, routine.body) @@ -1871,7 +1912,7 @@ def test_update_table(self): "labels": {"x": "y"}, } conn.api_request.assert_called_once_with( - method="PATCH", data=sent, path="/" + path, headers=None + method="PATCH", data=sent, path="/" + path, headers=None, timeout=None ) self.assertEqual(updated_table.description, table.description) self.assertEqual(updated_table.friendly_name, table.friendly_name) @@ -1907,6 +1948,7 @@ def test_update_table_w_custom_property(self): path="/%s" % path, data={"newAlphaProperty": "unreleased property"}, headers=None, + timeout=None, ) self.assertEqual( updated_table._properties["newAlphaProperty"], "unreleased property" @@ -1935,6 +1977,7 @@ def test_update_table_only_use_legacy_sql(self): path="/%s" % path, data={"view": {"useLegacySql": True}}, headers=None, + timeout=None, ) self.assertEqual(updated_table.view_use_legacy_sql, table.view_use_legacy_sql) @@ -2008,6 +2051,7 @@ def test_update_table_w_query(self): "schema": schema_resource, }, headers=None, + timeout=None, ) def test_update_table_w_schema_None(self): @@ -2102,7 +2146,7 @@ def test_list_tables_empty(self): self.assertEqual(tables, []) self.assertIsNone(token) conn.api_request.assert_called_once_with( - method="GET", path=path, query_params={} + method="GET", path=path, query_params={}, timeout=None ) def test_list_models_empty(self): @@ -2120,7 +2164,7 @@ def test_list_models_empty(self): self.assertEqual(models, []) self.assertIsNone(token) conn.api_request.assert_called_once_with( - method="GET", path=path, query_params={} + method="GET", path=path, query_params={}, timeout=None ) def test_list_models_defaults(self): @@ -2168,7 +2212,7 @@ def test_list_models_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=None ) def test_list_models_wrong_type(self): @@ -2193,6 +2237,7 @@ def test_list_routines_empty(self): method="GET", path="/projects/test-routines/datasets/test_routines/routines", query_params={}, + timeout=None, ) def test_list_routines_defaults(self): @@ -2244,7 +2289,7 @@ def test_list_routines_defaults(self): self.assertEqual(actual_token, token) conn.api_request.assert_called_once_with( - method="GET", path=path, query_params={} + method="GET", path=path, query_params={}, timeout=None ) def test_list_routines_wrong_type(self): @@ -2305,7 +2350,7 @@ def test_list_tables_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=None ) def test_list_tables_explicit(self): @@ -2367,6 +2412,7 @@ def test_list_tables_explicit(self): method="GET", path="/%s" % PATH, query_params={"maxResults": 3, "pageToken": TOKEN}, + timeout=None, ) def test_list_tables_wrong_type(self): @@ -2388,7 +2434,7 @@ def test_delete_dataset(self): for arg in datasets: client.delete_dataset(arg) conn.api_request.assert_called_with( - method="DELETE", path="/%s" % PATH, query_params={} + method="DELETE", path="/%s" % PATH, query_params={}, timeout=None ) def test_delete_dataset_delete_contents(self): @@ -2405,6 +2451,7 @@ def test_delete_dataset_delete_contents(self): method="DELETE", path="/%s" % PATH, query_params={"deleteContents": "true"}, + timeout=None, ) def test_delete_dataset_wrong_type(self): @@ -2425,7 +2472,9 @@ def test_delete_dataset_w_not_found_ok_false(self): with self.assertRaises(google.api_core.exceptions.NotFound): client.delete_dataset(self.DS_ID) - conn.api_request.assert_called_with(method="DELETE", path=path, query_params={}) + conn.api_request.assert_called_with( + method="DELETE", path=path, query_params={}, timeout=None + ) def test_delete_dataset_w_not_found_ok_true(self): path = "/projects/{}/datasets/{}".format(self.PROJECT, self.DS_ID) @@ -2438,7 +2487,9 @@ def test_delete_dataset_w_not_found_ok_true(self): client.delete_dataset(self.DS_ID, not_found_ok=True) - conn.api_request.assert_called_with(method="DELETE", path=path, query_params={}) + conn.api_request.assert_called_with( + method="DELETE", path=path, query_params={}, timeout=None + ) def test_delete_model(self): from google.cloud.bigquery.model import Model @@ -2461,7 +2512,9 @@ def test_delete_model(self): for arg in models: client.delete_model(arg) - conn.api_request.assert_called_with(method="DELETE", path="/%s" % path) + conn.api_request.assert_called_with( + method="DELETE", path="/%s" % path, timeout=None + ) def test_delete_model_w_wrong_type(self): creds = _make_credentials() @@ -2483,7 +2536,7 @@ def test_delete_model_w_not_found_ok_false(self): with self.assertRaises(google.api_core.exceptions.NotFound): client.delete_model("{}.{}".format(self.DS_ID, self.MODEL_ID)) - conn.api_request.assert_called_with(method="DELETE", path=path) + conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def test_delete_model_w_not_found_ok_true(self): path = "/projects/{}/datasets/{}/models/{}".format( @@ -2500,7 +2553,7 @@ def test_delete_model_w_not_found_ok_true(self): "{}.{}".format(self.DS_ID, self.MODEL_ID), not_found_ok=True ) - conn.api_request.assert_called_with(method="DELETE", path=path) + conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def test_delete_routine(self): from google.cloud.bigquery.routine import Routine @@ -2522,6 +2575,7 @@ def test_delete_routine(self): conn.api_request.assert_called_with( method="DELETE", path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", + timeout=None, ) def test_delete_routine_w_wrong_type(self): @@ -2544,6 +2598,7 @@ def test_delete_routine_w_not_found_ok_false(self): conn.api_request.assert_called_with( method="DELETE", path="/projects/routines-project/datasets/test_routines/routines/test_routine", + timeout=None, ) def test_delete_routine_w_not_found_ok_true(self): @@ -2561,6 +2616,7 @@ def test_delete_routine_w_not_found_ok_true(self): conn.api_request.assert_called_with( method="DELETE", path="/projects/routines-project/datasets/test_routines/routines/test_routine", + timeout=None, ) def test_delete_table(self): @@ -2587,7 +2643,9 @@ def test_delete_table(self): for arg in tables: client.delete_table(arg) - conn.api_request.assert_called_with(method="DELETE", path="/%s" % path) + conn.api_request.assert_called_with( + method="DELETE", path="/%s" % path, timeout=None + ) def test_delete_table_w_wrong_type(self): creds = _make_credentials() @@ -2609,7 +2667,7 @@ def test_delete_table_w_not_found_ok_false(self): with self.assertRaises(google.api_core.exceptions.NotFound): client.delete_table("{}.{}".format(self.DS_ID, self.TABLE_ID)) - conn.api_request.assert_called_with(method="DELETE", path=path) + conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def test_delete_table_w_not_found_ok_true(self): path = "/projects/{}/datasets/{}/tables/{}".format( @@ -2626,7 +2684,7 @@ def test_delete_table_w_not_found_ok_true(self): "{}.{}".format(self.DS_ID, self.TABLE_ID), not_found_ok=True ) - conn.api_request.assert_called_with(method="DELETE", path=path) + conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def test_job_from_resource_unknown_type(self): from google.cloud.bigquery.job import UnknownJob @@ -2653,6 +2711,7 @@ def test_get_job_miss_w_explict_project(self): method="GET", path="/projects/OTHER_PROJECT/jobs/NONESUCH", query_params={"projection": "full", "location": self.LOCATION}, + timeout=None, ) def test_get_job_miss_w_client_location(self): @@ -2671,6 +2730,7 @@ def test_get_job_miss_w_client_location(self): method="GET", path="/projects/OTHER_PROJECT/jobs/NONESUCH", query_params={"projection": "full", "location": self.LOCATION}, + timeout=None, ) def test_get_job_hit(self): @@ -2713,6 +2773,7 @@ def test_get_job_hit(self): method="GET", path="/projects/PROJECT/jobs/query_job", query_params={"projection": "full"}, + timeout=None, ) def test_cancel_job_miss_w_explict_project(self): @@ -2731,6 +2792,7 @@ def test_cancel_job_miss_w_explict_project(self): method="POST", path="/projects/OTHER_PROJECT/jobs/NONESUCH/cancel", query_params={"projection": "full", "location": self.LOCATION}, + timeout=None, ) def test_cancel_job_miss_w_client_location(self): @@ -2749,6 +2811,7 @@ def test_cancel_job_miss_w_client_location(self): method="POST", path="/projects/OTHER_PROJECT/jobs/NONESUCH/cancel", query_params={"projection": "full", "location": self.LOCATION}, + timeout=None, ) def test_cancel_job_hit(self): @@ -2777,6 +2840,7 @@ def test_cancel_job_hit(self): method="POST", path="/projects/PROJECT/jobs/query_job/cancel", query_params={"projection": "full"}, + timeout=None, ) def test_list_jobs_defaults(self): @@ -2890,7 +2954,10 @@ def test_list_jobs_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={"projection": "full"} + method="GET", + path="/%s" % PATH, + query_params={"projection": "full"}, + timeout=None, ) def test_list_jobs_load_job_wo_sourceUris(self): @@ -2932,7 +2999,10 @@ def test_list_jobs_load_job_wo_sourceUris(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={"projection": "full"} + method="GET", + path="/%s" % PATH, + query_params={"projection": "full"}, + timeout=None, ) def test_list_jobs_explicit_missing(self): @@ -2963,6 +3033,7 @@ def test_list_jobs_explicit_missing(self): "allUsers": True, "stateFilter": "done", }, + timeout=None, ) def test_list_jobs_w_project(self): @@ -2976,6 +3047,7 @@ def test_list_jobs_w_project(self): method="GET", path="/projects/other-project/jobs", query_params={"projection": "full"}, + timeout=None, ) def test_list_jobs_w_time_filter(self): @@ -2999,6 +3071,7 @@ def test_list_jobs_w_time_filter(self): "minCreationTime": "1", "maxCreationTime": str(end_time_millis), }, + timeout=None, ) def test_list_jobs_w_parent_job_filter(self): @@ -3016,6 +3089,7 @@ def test_list_jobs_w_parent_job_filter(self): method="GET", path="/projects/%s/jobs" % self.PROJECT, query_params={"projection": "full", "parentJobId": "parent-job-123"}, + timeout=None, ) conn.api_request.reset_mock() @@ -3053,7 +3127,10 @@ def test_load_table_from_uri(self): # Check that load_table_from_uri actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=None, ) # the original config object should not have been modified @@ -3112,7 +3189,10 @@ def test_load_table_from_uri_w_explicit_project(self): # Check that load_table_from_uri actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_load_table_from_uri_w_client_location(self): @@ -3153,7 +3233,10 @@ def test_load_table_from_uri_w_client_location(self): # Check that load_table_from_uri actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_load_table_from_uri_w_invalid_job_config(self): @@ -3358,7 +3441,10 @@ def test_copy_table(self): # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=None, ) self.assertIsInstance(job, CopyJob) @@ -3421,7 +3507,10 @@ def test_copy_table_w_explicit_project(self): # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_copy_table_w_client_location(self): @@ -3468,7 +3557,10 @@ def test_copy_table_w_client_location(self): # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_copy_table_w_source_strings(self): @@ -3556,7 +3648,10 @@ def test_copy_table_w_valid_job_config(self): # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=None, ) self.assertIsInstance(job._configuration, CopyJobConfig) @@ -3593,7 +3688,7 @@ def test_extract_table(self): # Check that extract_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=RESOURCE + method="POST", path="/projects/PROJECT/jobs", data=RESOURCE, timeout=None, ) # Check the job resource. @@ -3659,7 +3754,10 @@ def test_extract_table_w_explicit_project(self): # Check that extract_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_extract_table_w_client_location(self): @@ -3700,7 +3798,10 @@ def test_extract_table_w_client_location(self): # Check that extract_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_extract_table_generated_job_id(self): @@ -3743,6 +3844,7 @@ def test_extract_table_generated_job_id(self): self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") self.assertIsInstance(req["data"]["jobReference"]["jobId"], six.string_types) + self.assertIsNone(req["timeout"]) # Check the job resource. self.assertIsInstance(job, ExtractJob) @@ -3787,6 +3889,7 @@ def test_extract_table_w_destination_uris(self): _, req = conn.api_request.call_args self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") + self.assertIsNone(req["timeout"]) # Check the job resource. self.assertIsInstance(job, ExtractJob) @@ -3822,6 +3925,7 @@ def test_query_defaults(self): _, req = conn.api_request.call_args self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") + self.assertIsNone(req["timeout"]) sent = req["data"] self.assertIsInstance(sent["jobReference"]["jobId"], six.string_types) sent_config = sent["configuration"]["query"] @@ -3850,7 +3954,10 @@ def test_query_w_explicit_project(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_query_w_explicit_job_config(self): @@ -3906,7 +4013,7 @@ def test_query_w_explicit_job_config(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) # the original config object should not have been modified @@ -3950,7 +4057,7 @@ def test_query_preserving_explicit_job_config(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) # the original config object should not have been modified @@ -4002,7 +4109,7 @@ def test_query_preserving_explicit_default_job_config(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) # the original default config object should not have been modified @@ -4087,7 +4194,7 @@ def test_query_w_explicit_job_config_override(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) def test_query_w_client_default_config_no_incoming(self): @@ -4128,7 +4235,7 @@ def test_query_w_client_default_config_no_incoming(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) def test_query_w_invalid_default_job_config(self): @@ -4170,7 +4277,10 @@ def test_query_w_client_location(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_query_detect_location(self): @@ -4241,6 +4351,7 @@ def test_query_w_udf_resources(self): _, req = conn.api_request.call_args self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") + self.assertIsNone(req["timeout"]) sent = req["data"] self.assertIsInstance(sent["jobReference"]["jobId"], six.string_types) sent_config = sent["configuration"]["query"] @@ -4296,6 +4407,7 @@ def test_query_w_query_parameters(self): _, req = conn.api_request.call_args self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") + self.assertIsNone(req["timeout"]) sent = req["data"] self.assertEqual(sent["jobReference"]["jobId"], JOB) sent_config = sent["configuration"]["query"] @@ -4384,6 +4496,7 @@ def _row_data(row): self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/%s" % PATH) self.assertEqual(req["data"], SENT) + self.assertIsNone(req["timeout"]) def test_insert_rows_w_list_of_dictionaries(self): import datetime @@ -4448,7 +4561,7 @@ def _row_data(row): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None ) def test_insert_rows_w_list_of_Rows(self): @@ -4493,7 +4606,7 @@ def _row_data(row): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None ) def test_insert_rows_w_skip_invalid_and_ignore_unknown(self): @@ -4570,7 +4683,7 @@ def _row_data(row): errors[0]["errors"][0], RESPONSE["insertErrors"][0]["errors"][0] ) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None ) def test_insert_rows_w_repeated_fields(self): @@ -4664,7 +4777,7 @@ def test_insert_rows_w_repeated_fields(self): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None, ) def test_insert_rows_w_record_schema(self): @@ -4733,7 +4846,7 @@ def test_insert_rows_w_record_schema(self): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None ) def test_insert_rows_w_explicit_none_insert_ids(self): @@ -4767,7 +4880,7 @@ def _row_data(row): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/{}".format(PATH), data=SENT + method="POST", path="/{}".format(PATH), data=SENT, timeout=None, ) def test_insert_rows_errors(self): @@ -4835,6 +4948,7 @@ def test_insert_rows_w_numeric(self): project, ds_id, table_id ), data=sent, + timeout=None, ) @unittest.skipIf(pandas is None, "Requires `pandas`") @@ -4910,7 +5024,9 @@ def test_insert_rows_from_dataframe(self): for call, expected_data in six.moves.zip_longest( actual_calls, EXPECTED_SENT_DATA ): - expected_call = mock.call(method="POST", path=API_PATH, data=expected_data) + expected_call = mock.call( + method="POST", path=API_PATH, data=expected_data, timeout=None + ) assert call == expected_call @unittest.skipIf(pandas is None, "Requires `pandas`") @@ -4955,7 +5071,9 @@ def test_insert_rows_from_dataframe_many_columns(self): } ] } - expected_call = mock.call(method="POST", path=API_PATH, data=EXPECTED_SENT_DATA) + expected_call = mock.call( + method="POST", path=API_PATH, data=EXPECTED_SENT_DATA, timeout=None + ) actual_calls = conn.api_request.call_args_list assert len(actual_calls) == 1 @@ -5007,7 +5125,7 @@ def test_insert_rows_from_dataframe_w_explicit_none_insert_ids(self): actual_calls = conn.api_request.call_args_list assert len(actual_calls) == 1 assert actual_calls[0] == mock.call( - method="POST", path=API_PATH, data=EXPECTED_SENT_DATA + method="POST", path=API_PATH, data=EXPECTED_SENT_DATA, timeout=None ) def test_insert_rows_json(self): @@ -5054,7 +5172,7 @@ def test_insert_rows_json(self): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None, ) def test_insert_rows_json_with_string_id(self): @@ -5077,6 +5195,7 @@ def test_insert_rows_json_with_string_id(self): method="POST", path="/projects/proj/datasets/dset/tables/tbl/insertAll", data=expected, + timeout=None, ) def test_insert_rows_json_w_explicit_none_insert_ids(self): @@ -5098,6 +5217,7 @@ def test_insert_rows_json_w_explicit_none_insert_ids(self): method="POST", path="/projects/proj/datasets/dset/tables/tbl/insertAll", data=expected, + timeout=None, ) def test_list_partitions(self): @@ -5218,7 +5338,7 @@ def _bigquery_timestamp_float_repr(ts_float): self.assertEqual(page_token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=None ) def test_list_rows_empty_table(self): @@ -5326,6 +5446,7 @@ def test_list_rows_repeated_fields(self): method="GET", path="/%s" % PATH, query_params={"selectedFields": "color,struct"}, + timeout=None, ) def test_list_rows_w_record_schema(self): @@ -5392,7 +5513,7 @@ def test_list_rows_w_record_schema(self): self.assertEqual(page_token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=None ) def test_list_rows_with_missing_schema(self): @@ -5446,13 +5567,15 @@ def test_list_rows_with_missing_schema(self): row_iter = client.list_rows(table) - conn.api_request.assert_called_once_with(method="GET", path=table_path) + conn.api_request.assert_called_once_with( + method="GET", path=table_path, timeout=None + ) conn.api_request.reset_mock() self.assertEqual(row_iter.total_rows, 2, msg=repr(table)) rows = list(row_iter) conn.api_request.assert_called_once_with( - method="GET", path=tabledata_path, query_params={} + method="GET", path=tabledata_path, query_params={}, timeout=None ) self.assertEqual(row_iter.total_rows, 3, msg=repr(table)) self.assertEqual(rows[0].name, "Phred Phlyntstone", msg=repr(table)) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index e732bed4dcc6..737140c86316 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -626,6 +626,7 @@ def test__begin_defaults(self): method="POST", path="/projects/{}/jobs".format(self.PROJECT), data=resource, + timeout=None, ) self.assertEqual(job._properties, resource) @@ -656,6 +657,7 @@ def test__begin_explicit(self): method="POST", path="/projects/{}/jobs".format(self.PROJECT), data=resource, + timeout=None, ) self.assertEqual(job._properties, resource) @@ -675,6 +677,7 @@ def test_exists_defaults_miss(self): method="GET", path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), query_params={"fields": "id", "location": self.LOCATION}, + timeout=None, ) def test_exists_explicit_hit(self): @@ -702,6 +705,7 @@ def test_exists_explicit_hit(self): method="GET", path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), query_params={"fields": "id"}, + timeout=None, ) def test_reload_defaults(self): @@ -780,6 +784,7 @@ def test_cancel_defaults(self): method="POST", path="/projects/{}/jobs/{}/cancel".format(self.PROJECT, self.JOB_ID), query_params={"location": self.LOCATION}, + timeout=None, ) self.assertEqual(job._properties, resource) @@ -804,6 +809,7 @@ def test_cancel_explicit(self): method="POST", path="/projects/{}/jobs/{}/cancel".format(self.PROJECT, self.JOB_ID), query_params={}, + timeout=None, ) self.assertEqual(job._properties, resource) @@ -874,7 +880,7 @@ def test_done_defaults_wo_state(self): self.assertFalse(job.done()) - reload_.assert_called_once_with(retry=DEFAULT_RETRY) + reload_.assert_called_once_with(retry=DEFAULT_RETRY, timeout=None) def test_done_explicit_wo_state(self): from google.cloud.bigquery.retry import DEFAULT_RETRY @@ -886,7 +892,7 @@ def test_done_explicit_wo_state(self): self.assertFalse(job.done(retry=retry)) - reload_.assert_called_once_with(retry=retry) + reload_.assert_called_once_with(retry=retry, timeout=None) def test_done_already(self): client = _make_client(project=self.PROJECT) @@ -905,7 +911,7 @@ def test_result_default_wo_state(self, result): self.assertIs(job.result(), result.return_value) - begin.assert_called_once_with(retry=DEFAULT_RETRY) + begin.assert_called_once_with(retry=DEFAULT_RETRY, timeout=None) result.assert_called_once_with(timeout=None) @mock.patch("google.api_core.future.polling.PollingFuture.result") @@ -917,7 +923,7 @@ def test_result_w_retry_wo_state(self, result): self.assertIs(job.result(retry=retry), result.return_value) - begin.assert_called_once_with(retry=retry) + begin.assert_called_once_with(retry=retry, timeout=None) result.assert_called_once_with(timeout=None) @mock.patch("google.api_core.future.polling.PollingFuture.result") @@ -2288,6 +2294,7 @@ def test_begin_w_bound_client(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2325,7 +2332,9 @@ def test_begin_w_autodetect(self): } }, } - conn.api_request.assert_called_once_with(method="POST", path=path, data=sent) + conn.api_request.assert_called_once_with( + method="POST", path=path, data=sent, timeout=None + ) self._verifyResourceProperties(job, resource) def test_begin_w_alternate_client(self): @@ -2449,7 +2458,7 @@ def test_exists_miss_w_bound_client(self): self.assertFalse(job.exists()) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_exists_hit_w_alternate_client(self): @@ -2464,7 +2473,7 @@ def test_exists_hit_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_exists_miss_w_job_reference(self): @@ -2481,6 +2490,7 @@ def test_exists_miss_w_job_reference(self): method="GET", path="/projects/other-project/jobs/my-job-id", query_params={"fields": "id", "location": "US"}, + timeout=None, ) def test_reload_w_bound_client(self): @@ -2545,7 +2555,7 @@ def test_cancel_w_bound_client(self): job.cancel() conn.api_request.assert_called_once_with( - method="POST", path=PATH, query_params={} + method="POST", path=PATH, query_params={}, timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2563,7 +2573,7 @@ def test_cancel_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="POST", path=PATH, query_params={} + method="POST", path=PATH, query_params={}, timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2584,6 +2594,7 @@ def test_cancel_w_job_reference(self): method="POST", path="/projects/alternative-project/jobs/{}/cancel".format(self.JOB_ID), query_params={"location": "US"}, + timeout=None, ) @@ -2898,6 +2909,7 @@ def test_begin_w_bound_client(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2946,6 +2958,7 @@ def test_begin_w_alternate_client(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "configuration": {"copy": COPY_CONFIGURATION}, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2961,7 +2974,7 @@ def test_exists_miss_w_bound_client(self): self.assertFalse(job.exists()) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None, ) def test_exists_hit_w_alternate_client(self): @@ -2978,7 +2991,7 @@ def test_exists_hit_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_reload_w_bound_client(self): @@ -3259,6 +3272,7 @@ def test_begin_w_bound_client(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -3308,6 +3322,7 @@ def test_begin_w_alternate_client(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "configuration": {"extract": EXTRACT_CONFIGURATION}, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -3322,7 +3337,7 @@ def test_exists_miss_w_bound_client(self): self.assertFalse(job.exists()) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None, ) def test_exists_hit_w_alternate_client(self): @@ -3339,7 +3354,7 @@ def test_exists_hit_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_reload_w_bound_client(self): @@ -4580,12 +4595,16 @@ def test_result_w_page_size(self): conn.api_request.assert_has_calls( [ mock.call( - method="GET", path=tabledata_path, query_params={"maxResults": 3} + method="GET", + path=tabledata_path, + query_params={"maxResults": 3}, + timeout=None, ), mock.call( method="GET", path=tabledata_path, query_params={"pageToken": "some-page-token", "maxResults": 3}, + timeout=None, ), ] ) @@ -4726,6 +4745,7 @@ def test_begin_w_bound_client(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -4795,6 +4815,7 @@ def test_begin_w_alternate_client(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "configuration": {"dryRun": True, "query": QUERY_CONFIGURATION}, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -4845,6 +4866,7 @@ def test_begin_w_udf(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -4892,6 +4914,7 @@ def test_begin_w_named_query_parameter(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -4935,6 +4958,7 @@ def test_begin_w_positional_query_parameter(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -5011,6 +5035,7 @@ def test_begin_w_table_defs(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, want_resource) @@ -5043,6 +5068,7 @@ def test_dry_run_query(self): "dryRun": True, }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -5055,7 +5081,7 @@ def test_exists_miss_w_bound_client(self): self.assertFalse(job.exists()) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_exists_hit_w_alternate_client(self): @@ -5070,7 +5096,7 @@ def test_exists_hit_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_reload_w_bound_client(self): From a5188cb620cd0d4ea269cef75bcc77cfc2e2ad7b Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 20 Dec 2019 12:59:51 +0000 Subject: [PATCH 4/7] Explicitly test timeout parameter --- bigquery/tests/unit/test_client.py | 221 +++++++++++++++++++++-------- bigquery/tests/unit/test_job.py | 51 ++++++- 2 files changed, 208 insertions(+), 64 deletions(-) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 2a5b85e13ee0..5b70298453a2 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -288,9 +288,9 @@ def test_get_service_account_email(self): resource = {"kind": "bigquery#getServiceAccountResponse", "email": email} conn = client._connection = make_connection(resource) - service_account_email = client.get_service_account_email() + service_account_email = client.get_service_account_email(timeout=7.5) - conn.api_request.assert_called_once_with(method="GET", path=path, timeout=None) + conn.api_request.assert_called_once_with(method="GET", path=path, timeout=7.5) self.assertEqual(service_account_email, email) def test_get_service_account_email_w_alternate_project(self): @@ -354,6 +354,24 @@ def test_list_projects_defaults(self): method="GET", path="/projects", query_params={}, timeout=None ) + def test_list_projects_w_timeout(self): + PROJECT_1 = "PROJECT_ONE" + TOKEN = "TOKEN" + DATA = { + "nextPageToken": TOKEN, + "projects": [], + } + creds = _make_credentials() + client = self._make_one(PROJECT_1, creds) + conn = client._connection = make_connection(DATA) + + iterator = client.list_projects(timeout=7.5) + six.next(iterator.pages) + + conn.api_request.assert_called_once_with( + method="GET", path="/projects", query_params={}, timeout=7.5 + ) + def test_list_projects_explicit_response_missing_projects_key(self): TOKEN = "TOKEN" DATA = {} @@ -426,18 +444,18 @@ def test_list_datasets_defaults(self): method="GET", path="/%s" % PATH, query_params={}, timeout=None ) - def test_list_datasets_w_project(self): + def test_list_datasets_w_project_and_timeout(self): creds = _make_credentials() client = self._make_one(self.PROJECT, creds) conn = client._connection = make_connection({}) - list(client.list_datasets(project="other-project")) + list(client.list_datasets(project="other-project", timeout=7.5)) conn.api_request.assert_called_once_with( method="GET", path="/projects/other-project/datasets", query_params={}, - timeout=None, + timeout=7.5, ) def test_list_datasets_explicit_response_missing_datasets_key(self): @@ -507,10 +525,10 @@ def test_get_dataset(self): conn = client._connection = make_connection(resource) dataset_ref = client.dataset(self.DS_ID) - dataset = client.get_dataset(dataset_ref) + dataset = client.get_dataset(dataset_ref, timeout=7.5) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % path, timeout=None + method="GET", path="/%s" % path, timeout=7.5 ) self.assertEqual(dataset.dataset_id, self.DS_ID) @@ -586,7 +604,7 @@ def test_create_dataset_minimal(self): ds_ref = client.dataset(self.DS_ID) before = Dataset(ds_ref) - after = client.create_dataset(before) + after = client.create_dataset(before, timeout=7.5) self.assertEqual(after.dataset_id, self.DS_ID) self.assertEqual(after.project, self.PROJECT) @@ -603,7 +621,7 @@ def test_create_dataset_minimal(self): }, "labels": {}, }, - timeout=None, + timeout=7.5, ) def test_create_dataset_w_attrs(self): @@ -984,13 +1002,13 @@ def test_create_routine_w_minimal_resource(self): full_routine_id = "test-routine-project.test_routines.minimal_routine" routine = Routine(full_routine_id) - actual_routine = client.create_routine(routine) + actual_routine = client.create_routine(routine, timeout=7.5) conn.api_request.assert_called_once_with( method="POST", path="/projects/test-routine-project/datasets/test_routines/routines", data=resource, - timeout=None, + timeout=7.5, ) self.assertEqual( actual_routine.reference, RoutineReference.from_string(full_routine_id) @@ -1075,7 +1093,7 @@ def test_create_table_w_day_partition(self): table = Table(self.TABLE_REF) table.time_partitioning = TimePartitioning() - got = client.create_table(table) + got = client.create_table(table, timeout=7.5) conn.api_request.assert_called_once_with( method="POST", @@ -1089,7 +1107,7 @@ def test_create_table_w_day_partition(self): "timePartitioning": {"type": "DAY"}, "labels": {}, }, - timeout=None, + timeout=7.5, ) self.assertEqual(table.time_partitioning.type_, "DAY") self.assertEqual(got.table_id, self.TABLE_ID) @@ -1491,10 +1509,10 @@ def test_get_model(self): conn = client._connection = make_connection(resource) model_ref = client.dataset(self.DS_ID).model(self.MODEL_ID) - got = client.get_model(model_ref) + got = client.get_model(model_ref, timeout=7.5) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % path, timeout=None + method="GET", path="/%s" % path, timeout=7.5 ) self.assertEqual(got.model_id, self.MODEL_ID) @@ -1548,12 +1566,12 @@ def test_get_routine(self): client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection(resource) - actual_routine = client.get_routine(routine) + actual_routine = client.get_routine(routine, timeout=7.5) conn.api_request.assert_called_once_with( method="GET", path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", - timeout=None, + timeout=7.5, ) self.assertEqual( actual_routine.reference, @@ -1582,10 +1600,10 @@ def test_get_table(self): client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) resource = self._make_table_resource() conn = client._connection = make_connection(resource) - table = client.get_table(self.TABLE_REF) + table = client.get_table(self.TABLE_REF, timeout=7.5) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % path, timeout=None + method="GET", path="/%s" % path, timeout=7.5 ) self.assertEqual(table.table_id, self.TABLE_ID) @@ -1661,7 +1679,9 @@ def test_update_dataset(self): ds.labels = LABELS ds.access_entries = [AccessEntry("OWNER", "userByEmail", "phred@example.com")] ds2 = client.update_dataset( - ds, ["description", "friendly_name", "location", "labels", "access_entries"] + ds, + ["description", "friendly_name", "location", "labels", "access_entries"], + timeout=7.5, ) conn.api_request.assert_called_once_with( method="PATCH", @@ -1674,7 +1694,7 @@ def test_update_dataset(self): }, path="/" + PATH, headers=None, - timeout=None, + timeout=7.5, ) self.assertEqual(ds2.description, ds.description) self.assertEqual(ds2.friendly_name, ds.friendly_name) @@ -1753,7 +1773,7 @@ def test_update_model(self): model.labels = {"x": "y"} updated_model = client.update_model( - model, ["description", "friendly_name", "labels", "expires"] + model, ["description", "friendly_name", "labels", "expires"], timeout=7.5 ) sent = { @@ -1763,7 +1783,7 @@ def test_update_model(self): "labels": {"x": "y"}, } conn.api_request.assert_called_once_with( - method="PATCH", data=sent, path="/" + path, headers=None, timeout=None + method="PATCH", data=sent, path="/" + path, headers=None, timeout=7.5 ) self.assertEqual(updated_model.model_id, model.model_id) self.assertEqual(updated_model.description, model.description) @@ -1815,6 +1835,7 @@ def test_update_routine(self): actual_routine = client.update_routine( routine, ["arguments", "language", "body", "type_", "return_type", "someNewField"], + timeout=7.5, ) # TODO: routineReference isn't needed when the Routines API supports @@ -1825,7 +1846,7 @@ def test_update_routine(self): data=sent, path="/projects/routines-project/datasets/test_routines/routines/updated_routine", headers=None, - timeout=None, + timeout=7.5, ) self.assertEqual(actual_routine.arguments, routine.arguments) self.assertEqual(actual_routine.body, routine.body) @@ -1887,7 +1908,7 @@ def test_update_table(self): table.labels = {"x": "y"} updated_table = client.update_table( - table, ["schema", "description", "friendly_name", "labels"] + table, ["schema", "description", "friendly_name", "labels"], timeout=7.5 ) sent = { @@ -1912,7 +1933,7 @@ def test_update_table(self): "labels": {"x": "y"}, } conn.api_request.assert_called_once_with( - method="PATCH", data=sent, path="/" + path, headers=None, timeout=None + method="PATCH", data=sent, path="/" + path, headers=None, timeout=7.5 ) self.assertEqual(updated_table.description, table.description) self.assertEqual(updated_table.friendly_name, table.friendly_name) @@ -2130,14 +2151,14 @@ def test_update_table_delete_property(self): self.assertEqual(req[1]["data"], sent) self.assertIsNone(table3.description) - def test_list_tables_empty(self): + def test_list_tables_empty_w_timeout(self): path = "/projects/{}/datasets/{}/tables".format(self.PROJECT, self.DS_ID) creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection({}) dataset = client.dataset(self.DS_ID) - iterator = client.list_tables(dataset) + iterator = client.list_tables(dataset, timeout=7.5) self.assertIs(iterator.dataset, dataset) page = six.next(iterator.pages) tables = list(page) @@ -2146,17 +2167,17 @@ def test_list_tables_empty(self): self.assertEqual(tables, []) self.assertIsNone(token) conn.api_request.assert_called_once_with( - method="GET", path=path, query_params={}, timeout=None + method="GET", path=path, query_params={}, timeout=7.5 ) - def test_list_models_empty(self): + def test_list_models_empty_w_timeout(self): path = "/projects/{}/datasets/{}/models".format(self.PROJECT, self.DS_ID) creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection({}) dataset_id = "{}.{}".format(self.PROJECT, self.DS_ID) - iterator = client.list_models(dataset_id) + iterator = client.list_models(dataset_id, timeout=7.5) page = six.next(iterator.pages) models = list(page) token = iterator.next_page_token @@ -2164,7 +2185,7 @@ def test_list_models_empty(self): self.assertEqual(models, []) self.assertIsNone(token) conn.api_request.assert_called_once_with( - method="GET", path=path, query_params={}, timeout=None + method="GET", path=path, query_params={}, timeout=7.5 ) def test_list_models_defaults(self): @@ -2221,12 +2242,12 @@ def test_list_models_wrong_type(self): with self.assertRaises(TypeError): client.list_models(client.dataset(self.DS_ID).model("foo")) - def test_list_routines_empty(self): + def test_list_routines_empty_w_timeout(self): creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection({}) - iterator = client.list_routines("test-routines.test_routines") + iterator = client.list_routines("test-routines.test_routines", timeout=7.5) page = six.next(iterator.pages) routines = list(page) token = iterator.next_page_token @@ -2237,7 +2258,7 @@ def test_list_routines_empty(self): method="GET", path="/projects/test-routines/datasets/test_routines/routines", query_params={}, - timeout=None, + timeout=7.5, ) def test_list_routines_defaults(self): @@ -2432,9 +2453,9 @@ def test_delete_dataset(self): client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection(*([{}] * len(datasets))) for arg in datasets: - client.delete_dataset(arg) + client.delete_dataset(arg, timeout=7.5) conn.api_request.assert_called_with( - method="DELETE", path="/%s" % PATH, query_params={}, timeout=None + method="DELETE", path="/%s" % PATH, query_params={}, timeout=7.5 ) def test_delete_dataset_delete_contents(self): @@ -2511,9 +2532,9 @@ def test_delete_model(self): conn = client._connection = make_connection(*([{}] * len(models))) for arg in models: - client.delete_model(arg) + client.delete_model(arg, timeout=7.5) conn.api_request.assert_called_with( - method="DELETE", path="/%s" % path, timeout=None + method="DELETE", path="/%s" % path, timeout=7.5 ) def test_delete_model_w_wrong_type(self): @@ -2571,11 +2592,11 @@ def test_delete_routine(self): conn = client._connection = make_connection(*([{}] * len(routines))) for routine in routines: - client.delete_routine(routine) + client.delete_routine(routine, timeout=7.5) conn.api_request.assert_called_with( method="DELETE", path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", - timeout=None, + timeout=7.5, ) def test_delete_routine_w_wrong_type(self): @@ -2642,9 +2663,9 @@ def test_delete_table(self): conn = client._connection = make_connection(*([{}] * len(tables))) for arg in tables: - client.delete_table(arg) + client.delete_table(arg, timeout=7.5) conn.api_request.assert_called_with( - method="DELETE", path="/%s" % path, timeout=None + method="DELETE", path="/%s" % path, timeout=7.5 ) def test_delete_table_w_wrong_type(self): @@ -2733,7 +2754,7 @@ def test_get_job_miss_w_client_location(self): timeout=None, ) - def test_get_job_hit(self): + def test_get_job_hit_w_timeout(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import QueryJob from google.cloud.bigquery.job import WriteDisposition @@ -2762,7 +2783,7 @@ def test_get_job_hit(self): client = self._make_one(self.PROJECT, creds) conn = client._connection = make_connection(ASYNC_QUERY_DATA) - job = client.get_job(JOB_ID) + job = client.get_job(JOB_ID, timeout=7.5) self.assertIsInstance(job, QueryJob) self.assertEqual(job.job_id, JOB_ID) @@ -2773,7 +2794,7 @@ def test_get_job_hit(self): method="GET", path="/projects/PROJECT/jobs/query_job", query_params={"projection": "full"}, - timeout=None, + timeout=7.5, ) def test_cancel_job_miss_w_explict_project(self): @@ -2843,6 +2864,30 @@ def test_cancel_job_hit(self): timeout=None, ) + def test_cancel_job_w_timeout(self): + JOB_ID = "query_job" + QUERY = "SELECT * from test_dataset:test_table" + QUERY_JOB_RESOURCE = { + "id": "{}:{}".format(self.PROJECT, JOB_ID), + "jobReference": {"projectId": self.PROJECT, "jobId": "query_job"}, + "state": "RUNNING", + "configuration": {"query": {"query": QUERY}}, + } + RESOURCE = {"job": QUERY_JOB_RESOURCE} + + creds = _make_credentials() + client = self._make_one(self.PROJECT, creds) + conn = client._connection = make_connection(RESOURCE) + + client.cancel_job(JOB_ID, timeout=7.5) + + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/{}/jobs/query_job/cancel".format(self.PROJECT), + query_params={"projection": "full"}, + timeout=7.5, + ) + def test_list_jobs_defaults(self): from google.cloud.bigquery.job import CopyJob from google.cloud.bigquery.job import CreateDisposition @@ -3050,6 +3095,20 @@ def test_list_jobs_w_project(self): timeout=None, ) + def test_list_jobs_w_timeout(self): + creds = _make_credentials() + client = self._make_one(self.PROJECT, creds) + conn = client._connection = make_connection({}) + + list(client.list_jobs(timeout=7.5)) + + conn.api_request.assert_called_once_with( + method="GET", + path="/projects/{}/jobs".format(self.PROJECT), + query_params={"projection": "full"}, + timeout=7.5, + ) + def test_list_jobs_w_time_filter(self): creds = _make_credentials() client = self._make_one(self.PROJECT, creds) @@ -3122,7 +3181,7 @@ def test_load_table_from_uri(self): destination = client.dataset(self.DS_ID).table(DESTINATION) job = client.load_table_from_uri( - SOURCE_URI, destination, job_id=JOB, job_config=job_config + SOURCE_URI, destination, job_id=JOB, job_config=job_config, timeout=7.5 ) # Check that load_table_from_uri actually starts the job. @@ -3130,7 +3189,7 @@ def test_load_table_from_uri(self): method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE, - timeout=None, + timeout=7.5, ) # the original config object should not have been modified @@ -3437,14 +3496,14 @@ def test_copy_table(self): source = dataset.table(SOURCE) destination = dataset.table(DESTINATION) - job = client.copy_table(source, destination, job_id=JOB) + job = client.copy_table(source, destination, job_id=JOB, timeout=7.5) # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE, - timeout=None, + timeout=7.5, ) self.assertIsInstance(job, CopyJob) @@ -3684,11 +3743,11 @@ def test_extract_table(self): dataset = client.dataset(self.DS_ID) source = dataset.table(SOURCE) - job = client.extract_table(source, DESTINATION, job_id=JOB) + job = client.extract_table(source, DESTINATION, job_id=JOB, timeout=7.5) # Check that extract_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=RESOURCE, timeout=None, + method="POST", path="/projects/PROJECT/jobs", data=RESOURCE, timeout=7.5, ) # Check the job resource. @@ -3932,6 +3991,27 @@ def test_query_defaults(self): self.assertEqual(sent_config["query"], QUERY) self.assertFalse(sent_config["useLegacySql"]) + def test_query_w_explicit_timeout(self): + query = "select count(*) from persons" + resource = { + "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, + "configuration": {"query": {"query": query, "useLegacySql": False}}, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(resource) + + client.query(query, timeout=7.5) + + # Check that query actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/{}/jobs".format(self.PROJECT), + data=resource, + timeout=7.5, + ) + def test_query_w_explicit_project(self): job_id = "some-job-id" query = "select count(*) from persons" @@ -4422,6 +4502,31 @@ def test_query_w_query_parameters(self): }, ) + def test_insert_rows_w_timeout(self): + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection({}) + table = Table(self.TABLE_REF) + + ROWS = [ + ("Phred Phlyntstone", 32), + ("Bharney Rhubble", 33), + ] + schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + + client.insert_rows(table, ROWS, selected_fields=schema, timeout=7.5) + + conn.api_request.assert_called_once() + _, req = conn.api_request.call_args + self.assertEqual(req.get("timeout"), 7.5) + def test_insert_rows_wo_schema(self): from google.cloud.bigquery.table import Table @@ -4985,7 +5090,7 @@ def test_insert_rows_from_dataframe(self): with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))): error_info = client.insert_rows_from_dataframe( - table, dataframe, chunk_size=3 + table, dataframe, chunk_size=3, timeout=7.5 ) self.assertEqual(len(error_info), 2) @@ -5025,7 +5130,7 @@ def test_insert_rows_from_dataframe(self): actual_calls, EXPECTED_SENT_DATA ): expected_call = mock.call( - method="POST", path=API_PATH, data=expected_data, timeout=None + method="POST", path=API_PATH, data=expected_data, timeout=7.5 ) assert call == expected_call @@ -5168,11 +5273,11 @@ def test_insert_rows_json(self): } with mock.patch("uuid.uuid4", side_effect=map(str, range(len(ROWS)))): - errors = client.insert_rows_json(table, ROWS) + errors = client.insert_rows_json(table, ROWS, timeout=7.5) self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT, timeout=None, + method="POST", path="/%s" % PATH, data=SENT, timeout=7.5, ) def test_insert_rows_json_with_string_id(self): @@ -5322,7 +5427,7 @@ def _bigquery_timestamp_float_repr(ts_float): joined = SchemaField("joined", "TIMESTAMP", mode="NULLABLE") table = Table(self.TABLE_REF, schema=[full_name, age, joined]) - iterator = client.list_rows(table) + iterator = client.list_rows(table, timeout=7.5) page = six.next(iterator.pages) rows = list(page) total_rows = iterator.total_rows @@ -5338,7 +5443,7 @@ def _bigquery_timestamp_float_repr(ts_float): self.assertEqual(page_token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={}, timeout=None + method="GET", path="/%s" % PATH, query_params={}, timeout=7.5 ) def test_list_rows_empty_table(self): diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 737140c86316..ba9a53c49027 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -650,14 +650,14 @@ def test__begin_explicit(self): call_api.return_value = resource retry = DEFAULT_RETRY.with_deadline(1) - job._begin(client=client, retry=retry) + job._begin(client=client, retry=retry, timeout=7.5) call_api.assert_called_once_with( retry, method="POST", path="/projects/{}/jobs".format(self.PROJECT), data=resource, - timeout=None, + timeout=7.5, ) self.assertEqual(job._properties, resource) @@ -708,6 +708,23 @@ def test_exists_explicit_hit(self): timeout=None, ) + def test_exists_w_timeout(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + PATH = "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID) + job = self._set_properties_job() + call_api = job._client._call_api = mock.Mock() + + job.exists(timeout=7.5) + + call_api.assert_called_once_with( + DEFAULT_RETRY, + method="GET", + path=PATH, + query_params={"fields": "id"}, + timeout=7.5, + ) + def test_reload_defaults(self): from google.cloud.bigquery.retry import DEFAULT_RETRY @@ -803,13 +820,13 @@ def test_cancel_explicit(self): client = _make_client(project=other_project) connection = client._connection = _make_connection(response) - self.assertTrue(job.cancel(client=client)) + self.assertTrue(job.cancel(client=client, timeout=7.5)) connection.api_request.assert_called_once_with( method="POST", path="/projects/{}/jobs/{}/cancel".format(self.PROJECT, self.JOB_ID), query_params={}, - timeout=None, + timeout=7.5, ) self.assertEqual(job._properties, resource) @@ -890,9 +907,9 @@ def test_done_explicit_wo_state(self): reload_ = job.reload = mock.Mock() retry = DEFAULT_RETRY.with_deadline(1) - self.assertFalse(job.done(retry=retry)) + self.assertFalse(job.done(retry=retry, timeout=7.5)) - reload_.assert_called_once_with(retry=retry, timeout=None) + reload_.assert_called_once_with(retry=retry, timeout=7.5) def test_done_already(self): client = _make_client(project=self.PROJECT) @@ -4706,6 +4723,28 @@ def test__begin_error(self): expected_line = "{}:{}".format(i, line) assert expected_line in full_text + def test__begin_w_timeout(self): + PATH = "/projects/%s/jobs" % (self.PROJECT,) + RESOURCE = self._make_resource() + + conn = _make_connection(RESOURCE) + client = _make_client(project=self.PROJECT, connection=conn) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + job._begin(timeout=7.5) + + conn.api_request.assert_called_once_with( + method="POST", + path=PATH, + data={ + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "configuration": { + "query": {"query": self.QUERY, "useLegacySql": False} + }, + }, + timeout=7.5, + ) + def test_begin_w_bound_client(self): from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.job import QueryJobConfig From 769c80b5184bb980980ec7e82c1e140a3823ebea Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 20 Dec 2019 16:02:09 +0000 Subject: [PATCH 5/7] Split timeout in multi-request methods If a method makes multiple requests and is given a timeout, that timeout should represent the total allowed time for all requests combined. --- bigquery/google/cloud/bigquery/client.py | 35 +++++++---- bigquery/google/cloud/bigquery/job.py | 42 ++++++++++--- bigquery/noxfile.py | 2 +- bigquery/tests/unit/test_client.py | 78 ++++++++++++++++++++++++ bigquery/tests/unit/test_job.py | 65 +++++++++++++++++++- 5 files changed, 199 insertions(+), 23 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index b3f8d3e73f42..5707d57cdb62 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -22,6 +22,7 @@ except ImportError: # Python 2.7 import collections as collections_abc +import concurrent.futures import copy import functools import gzip @@ -47,6 +48,7 @@ import google.api_core.client_options import google.api_core.exceptions from google.api_core import page_iterator +from google.auth.transport.requests import TimeoutGuard import google.cloud._helpers from google.cloud import exceptions from google.cloud.client import ClientWithProject @@ -2557,21 +2559,27 @@ def list_partitions(self, table, retry=DEFAULT_RETRY, timeout=None): timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + If multiple requests are made under the hood, ``timeout`` is + interpreted as the approximate total time of **all** requests. Returns: List[str]: A list of the partition ids present in the partitioned table """ - # TODO: split timeout between all API calls in the method table = _table_arg_to_table_ref(table, default_project=self.project) - meta_table = self.get_table( - TableReference( - self.dataset(table.dataset_id, project=table.project), - "%s$__PARTITIONS_SUMMARY__" % table.table_id, - ), - retry=retry, - timeout=timeout, - ) + + with TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError + ) as guard: + meta_table = self.get_table( + TableReference( + self.dataset(table.dataset_id, project=table.project), + "%s$__PARTITIONS_SUMMARY__" % table.table_id, + ), + retry=retry, + timeout=timeout, + ) + timeout = guard.remaining_timeout subset = [col for col in meta_table.schema if col.name == "partition_id"] return [ @@ -2638,6 +2646,8 @@ def list_rows( timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + If multiple requests are made under the hood, ``timeout`` is + interpreted as the approximate total time of **all** requests. Returns: google.cloud.bigquery.table.RowIterator: @@ -2648,7 +2658,6 @@ def list_rows( (this is distinct from the total number of rows in the current page: ``iterator.page.num_items``). """ - # TODO: split timeout between all internal API calls table = _table_arg_to_table(table, default_project=self.project) if not isinstance(table, Table): @@ -2663,7 +2672,11 @@ def list_rows( # No schema, but no selected_fields. Assume the developer wants all # columns, so get the table resource for them rather than failing. elif len(schema) == 0: - table = self.get_table(table.reference, retry=retry, timeout=timeout) + with TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError + ) as guard: + table = self.get_table(table.reference, retry=retry, timeout=timeout) + timeout = guard.remaining_timeout schema = table.schema params = {} diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 7a5fe6decafb..34628350c922 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -26,6 +26,7 @@ from six.moves import http_client import google.api_core.future.polling +from google.auth.transport.requests import TimeoutGuard from google.cloud import exceptions from google.cloud.exceptions import NotFound from google.cloud.bigquery.dataset import Dataset @@ -793,6 +794,8 @@ def result(self, retry=DEFAULT_RETRY, timeout=None): timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + If multiple requests are made under the hood, ``timeout`` is + interpreted as the approximate total time of **all** requests. Returns: _AsyncJob: This instance. @@ -803,10 +806,12 @@ def result(self, retry=DEFAULT_RETRY, timeout=None): concurrent.futures.TimeoutError: if the job did not complete in the given timeout. """ - # TODO: combine _begin timeout with super().result() timeout! - # borrow timeout guard from google auth lib if self.state is None: - self._begin(retry=retry, timeout=timeout) + with TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError + ) as guard: + self._begin(retry=retry, timeout=timeout) + timeout = guard.remaining_timeout # TODO: modify PollingFuture so it can pass a retry argument to done(). return super(_AsyncJob, self).result(timeout=timeout) @@ -3163,6 +3168,8 @@ def result( timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + If multiple requests are made under the hood, ``timeout`` is + interpreted as the approximate total time of **all** requests. Returns: google.cloud.bigquery.table.RowIterator: @@ -3180,16 +3187,27 @@ def result( If the job did not complete in the given timeout. """ try: - # TODO: combine timeout with timeouts passed to super().result() - # and _get_query_results (total timeout shared by both) - # borrow timeout guard from google auth lib - super(QueryJob, self).result(timeout=timeout) + guard = TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError + ) + with guard: + super(QueryJob, self).result(retry=retry, timeout=timeout) + timeout = guard.remaining_timeout # Return an iterator instead of returning the job. if not self._query_results: - self._query_results = self._client._get_query_results( - self.job_id, retry, project=self.project, location=self.location + guard = TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError ) + with guard: + self._query_results = self._client._get_query_results( + self.job_id, + retry, + project=self.project, + location=self.location, + timeout=timeout, + ) + timeout = guard.remaining_timeout except exceptions.GoogleCloudError as exc: exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self @@ -3209,7 +3227,11 @@ def result( dest_table = Table(dest_table_ref, schema=schema) dest_table._properties["numRows"] = self._query_results.total_rows rows = self._client.list_rows( - dest_table, page_size=page_size, retry=retry, max_results=max_results + dest_table, + page_size=page_size, + max_results=max_results, + retry=retry, + timeout=timeout, ) rows._preserve_order = _contains_order_by(self.query) return rows diff --git a/bigquery/noxfile.py b/bigquery/noxfile.py index 8c041fa6a178..69b96b3dc984 100644 --- a/bigquery/noxfile.py +++ b/bigquery/noxfile.py @@ -34,7 +34,7 @@ def default(session): run the tests. """ # Install all test dependencies, then install local packages in-place. - session.install("mock", "pytest", "pytest-cov") + session.install("mock", "pytest", "pytest-cov", "freezegun") for local_dep in LOCAL_DEPS: session.install("-e", local_dep) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 5b70298453a2..f757f8b691c7 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -24,6 +24,7 @@ import unittest import warnings +import freezegun import mock import requests import six @@ -5367,6 +5368,43 @@ def test_list_partitions_with_string_id(self): self.assertEqual(len(partition_list), 0) + def test_list_partitions_splitting_timout_between_requests(self): + from google.cloud.bigquery.table import Table + + row_count = 2 + meta_info = _make_list_partitons_meta_info( + self.PROJECT, self.DS_ID, self.TABLE_ID, row_count + ) + + data = { + "totalRows": str(row_count), + "rows": [{"f": [{"v": "20180101"}]}, {"f": [{"v": "20180102"}]},], + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + client._connection = make_connection(meta_info, data) + table = Table(self.TABLE_REF) + + with freezegun.freeze_time("2019-01-01 00:00:00", tick=False) as frozen_time: + + def delayed_get_table(*args, **kwargs): + frozen_time.tick(delta=1.4) + return orig_get_table(*args, **kwargs) + + orig_get_table = client.get_table + client.get_table = mock.Mock(side_effect=delayed_get_table) + + client.list_partitions(table, timeout=5.0) + + client.get_table.assert_called_once() + _, kwargs = client.get_table.call_args + self.assertEqual(kwargs.get("timeout"), 5.0) + + client._connection.api_request.assert_called() + _, kwargs = client._connection.api_request.call_args + self.assertAlmostEqual(kwargs.get("timeout"), 3.6, places=5) + def test_list_rows(self): import datetime from google.cloud._helpers import UTC @@ -5687,6 +5725,46 @@ def test_list_rows_with_missing_schema(self): self.assertEqual(rows[1].age, 31, msg=repr(table)) self.assertIsNone(rows[2].age, msg=repr(table)) + def test_list_rows_splitting_timout_between_requests(self): + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table + + response = {"totalRows": "0", "rows": []} + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + client._connection = make_connection(response, response) + + table = Table( + self.TABLE_REF, schema=[SchemaField("field_x", "INTEGER", mode="NULLABLE")] + ) + + with freezegun.freeze_time("1970-01-01 00:00:00", tick=False) as frozen_time: + + def delayed_get_table(*args, **kwargs): + frozen_time.tick(delta=1.4) + return table + + client.get_table = mock.Mock(side_effect=delayed_get_table) + + rows_iter = client.list_rows( + "{}.{}.{}".format( + self.TABLE_REF.project, + self.TABLE_REF.dataset_id, + self.TABLE_REF.table_id, + ), + timeout=5.0, + ) + six.next(rows_iter.pages) + + client.get_table.assert_called_once() + _, kwargs = client.get_table.call_args + self.assertEqual(kwargs.get("timeout"), 5.0) + + client._connection.api_request.assert_called_once() + _, kwargs = client._connection.api_request.call_args + self.assertAlmostEqual(kwargs.get("timeout"), 3.6) + def test_list_rows_error(self): creds = _make_credentials() http = object() diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index ba9a53c49027..b796f3f73675 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -18,6 +18,7 @@ import textwrap import unittest +import freezegun import mock import pytest import requests @@ -956,6 +957,24 @@ def test_result_explicit_w_state(self, result): begin.assert_not_called() result.assert_called_once_with(timeout=timeout) + @mock.patch("google.api_core.future.polling.PollingFuture.result") + def test_result_splitting_timout_between_requests(self, result): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + begin = job._begin = mock.Mock() + retry = mock.Mock() + + with freezegun.freeze_time("1970-01-01 00:00:00", tick=False) as frozen_time: + + def delayed_begin(*args, **kwargs): + frozen_time.tick(delta=0.3) + + begin.side_effect = delayed_begin + job.result(retry=retry, timeout=1.0) + + begin.assert_called_once_with(retry=retry, timeout=1.0) + result.assert_called_once_with(timeout=0.7) + def test_cancelled_wo_error_result(self): client = _make_client(project=self.PROJECT) job = self._make_one(self.JOB_ID, client) @@ -4551,7 +4570,8 @@ def test_result_w_timeout(self): client = _make_client(project=self.PROJECT, connection=connection) job = self._make_one(self.JOB_ID, self.QUERY, client) - job.result(timeout=1.0) + with freezegun.freeze_time("1970-01-01 00:00:00", tick=False): + job.result(timeout=1.0) self.assertEqual(len(connection.api_request.call_args_list), 3) begin_request = connection.api_request.call_args_list[0] @@ -4566,6 +4586,49 @@ def test_result_w_timeout(self): self.assertEqual(query_request[1]["query_params"]["timeoutMs"], 900) self.assertEqual(reload_request[1]["method"], "GET") + @mock.patch("google.api_core.future.polling.PollingFuture.result") + def test_result_splitting_timout_between_requests(self, polling_result): + begun_resource = self._make_resource() + query_resource = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, + "totalRows": "5", + } + done_resource = copy.deepcopy(begun_resource) + done_resource["status"] = {"state": "DONE"} + + connection = _make_connection(begun_resource, query_resource, done_resource) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + client.list_rows = mock.Mock() + + with freezegun.freeze_time("1970-01-01 00:00:00", tick=False) as frozen_time: + + def delayed_result(*args, **kwargs): + frozen_time.tick(delta=0.8) + + polling_result.side_effect = delayed_result + + def delayed_get_results(*args, **kwargs): + frozen_time.tick(delta=0.5) + return orig_get_results(*args, **kwargs) + + orig_get_results = client._get_query_results + client._get_query_results = mock.Mock(side_effect=delayed_get_results) + job.result(timeout=2.0) + + polling_result.assert_called_once_with(timeout=2.0) + + client._get_query_results.assert_called_once() + _, kwargs = client._get_query_results.call_args + self.assertAlmostEqual(kwargs.get("timeout"), 1.2) + + client.list_rows.assert_called_once() + _, kwargs = client.list_rows.call_args + self.assertAlmostEqual(kwargs.get("timeout"), 0.7) + def test_result_w_page_size(self): # Arrange query_results_resource = { From 4cfd47438205d6adc72352d1957830e97fb9fbee Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 23 Dec 2019 15:45:33 +0000 Subject: [PATCH 6/7] Fix minor styling issue --- bigquery/tests/unit/test_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index f757f8b691c7..f86fe16799d3 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5378,7 +5378,7 @@ def test_list_partitions_splitting_timout_between_requests(self): data = { "totalRows": str(row_count), - "rows": [{"f": [{"v": "20180101"}]}, {"f": [{"v": "20180102"}]},], + "rows": [{"f": [{"v": "20180101"}]}, {"f": [{"v": "20180102"}]}], } creds = _make_credentials() http = object() From b01d214b2ab272b6e78a9cd2f8e2d6791acac048 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 26 Dec 2019 20:25:20 +0000 Subject: [PATCH 7/7] Add timeout with retry test for Client._call_api() --- bigquery/tests/unit/test_client.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index f86fe16799d3..a5100fe6eaef 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -213,6 +213,28 @@ def test_ctor_w_query_job_config(self): self.assertIsInstance(client._default_query_job_config, QueryJobConfig) self.assertTrue(client._default_query_job_config.dry_run) + def test__call_api_applying_custom_retry_on_timeout(self): + from concurrent.futures import TimeoutError + from google.cloud.bigquery.retry import DEFAULT_RETRY + + client = self._make_one() + + api_request_patcher = mock.patch.object( + client._connection, "api_request", side_effect=[TimeoutError, "result"], + ) + retry = DEFAULT_RETRY.with_deadline(1).with_predicate( + lambda exc: isinstance(exc, TimeoutError) + ) + + with api_request_patcher as fake_api_request: + result = client._call_api(retry, foo="bar") + + self.assertEqual(result, "result") + self.assertEqual( + fake_api_request.call_args_list, + [mock.call(foo="bar"), mock.call(foo="bar")], # was retried once + ) + def test__get_query_results_miss_w_explicit_project_and_timeout(self): from google.cloud.exceptions import NotFound