From fbbf79d1a527ea1cf89cbd3f89d3c4a456d2d75a Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Fri, 7 Feb 2020 15:02:38 +0530 Subject: [PATCH 1/3] feat(bigquery): add create job method --- google/cloud/bigquery/client.py | 68 ++++++++++++++++++++ tests/unit/test_client.py | 108 ++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 188fb19cb..3dea55654 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -53,6 +53,7 @@ from google.cloud import exceptions from google.cloud.client import ClientWithProject +from google.cloud.bigquery._helpers import _get_sub_prop from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._helpers import _verify_job_config_type @@ -1314,6 +1315,73 @@ def job_from_resource(self, resource): return job.QueryJob.from_api_repr(resource, self) return job.UnknownJob.from_api_repr(resource, self) + def create_job(self, job_config, retry=DEFAULT_RETRY): + """Create a new job. + Arguments: + job_config (dict): configuration job representation returned from the API. + + Keyword Arguments: + retry (google.api_core.retry.Retry): + (Optional) How to retry the RPC. + + Returns: + Union[ \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]: + A new job instance. + """ + + if "load" in job_config: + load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr( + job_config + ) + destination = TableReference.from_api_repr( + job_config["load"]["destinationTable"] + ) + source_uris = _get_sub_prop(job_config, ["load", "sourceUris"]) + return self.load_table_from_uri( + source_uris, destination, job_config=load_job_config, retry=retry + ) + elif "copy" in job_config: + copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( + job_config + ) + copy_resource = job_config["copy"] + destination = TableReference.from_api_repr( + copy_resource["destinationTable"] + ) + sources = [] + source_configs = copy_resource.get("sourceTables") + if source_configs is None: + source_configs = [copy_resource["sourceTable"]] + for source_config in source_configs: + table_ref = TableReference.from_api_repr(source_config) + sources.append(table_ref) + return self.copy_table( + sources, destination, job_config=copy_job_config, retry=retry + ) + elif "extract" in job_config: + extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( + job_config + ) + source = TableReference.from_api_repr(job_config["extract"]["sourceTable"]) + destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"]) + return self.extract_table( + source, destination_uris, job_config=extract_job_config, retry=retry + ) + elif "query" in job_config: + del job_config["query"]["destinationTable"] + query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( + job_config + ) + query = _get_sub_prop(job_config, ["query", "query"]) + return self.query(query, job_config=query_job_config, retry=retry) + else: + raise TypeError("Invalid job configuration received.") + def get_job( self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None ): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 2227183a9..5b480fb13 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2796,6 +2796,114 @@ def test_delete_table_w_not_found_ok_true(self): conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) + def _create_job_helper(self, job_config, client_method): + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + client._connection = make_connection() + rf1 = mock.Mock() + get_config_patch = mock.patch( + "google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1, + ) + load_patch = mock.patch(client_method, autospec=True) + + with load_patch as client_method, get_config_patch: + client.create_job(job_config=job_config) + client_method.assert_called_once() + + def test_create_job_load_config(self): + configuration = { + "load": { + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "sourceUris": ["gs://test_bucket/src_object*"], + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.load_table_from_uri" + ) + + def test_create_job_copy_config(self): + configuration = { + "copy": { + "sourceTables": [ + { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + } + ], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "destination_table", + }, + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.copy_table", + ) + + def test_create_job_copy_config_w_single_source(self): + configuration = { + "copy": { + "sourceTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "destination_table", + }, + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.copy_table", + ) + + def test_create_job_extract_config(self): + configuration = { + "extract": { + "sourceTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "destinationUris": ["gs://test_bucket/dst_object*"], + } + } + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.extract_table", + ) + + def test_create_job_query_config(self): + configuration = { + "query": {"query": "query", "destinationTable": {"tableId": "table_id"}} + } + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.query", + ) + + def test_create_job_w_invalid_job_config(self): + configuration = {"unknown": {}} + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + with self.assertRaises(TypeError) as exc: + client.create_job(job_config=configuration) + + self.assertIn("Invalid job configuration", exc.exception.args[0]) + def test_job_from_resource_unknown_type(self): from google.cloud.bigquery.job import UnknownJob From ad758aa87b596abdb62cda6f78f123bc10a8c1bf Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Wed, 12 Feb 2020 13:03:03 +0530 Subject: [PATCH 2/3] feat(bigquery): Addressed comments and add unit test --- google/cloud/bigquery/client.py | 19 +++++----- tests/unit/test_client.py | 62 +++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 11 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 3dea55654..45b68ee09 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -57,6 +57,7 @@ from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._helpers import _verify_job_config_type +from google.cloud.bigquery._helpers import _del_sub_prop from google.cloud.bigquery._http import Connection from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.dataset import Dataset @@ -1338,9 +1339,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr( job_config ) - destination = TableReference.from_api_repr( - job_config["load"]["destinationTable"] - ) + destination = _get_sub_prop(job_config, ["load", "destinationTable"]) source_uris = _get_sub_prop(job_config, ["load", "sourceUris"]) return self.load_table_from_uri( source_uris, destination, job_config=load_job_config, retry=retry @@ -1349,14 +1348,12 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( job_config ) - copy_resource = job_config["copy"] - destination = TableReference.from_api_repr( - copy_resource["destinationTable"] - ) + destination = _get_sub_prop(job_config, ["copy", "destinationTable"]) sources = [] - source_configs = copy_resource.get("sourceTables") + source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"]) + if source_configs is None: - source_configs = [copy_resource["sourceTable"]] + source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])] for source_config in source_configs: table_ref = TableReference.from_api_repr(source_config) sources.append(table_ref) @@ -1367,13 +1364,13 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( job_config ) - source = TableReference.from_api_repr(job_config["extract"]["sourceTable"]) + source = _get_sub_prop(job_config, ["extract", "sourceTable"]) destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"]) return self.extract_table( source, destination_uris, job_config=extract_job_config, retry=retry ) elif "query" in job_config: - del job_config["query"]["destinationTable"] + _del_sub_prop(job_config, ["query", "destinationTable"]) query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( job_config ) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 5b480fb13..63325e95e 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2893,6 +2893,68 @@ def test_create_job_query_config(self): configuration, "google.cloud.bigquery.client.Client.query", ) + def test_create_job_query_config_w_rateLimitExceeded_error(self): + from google.cloud.exceptions import Forbidden + from google.cloud.bigquery.retry import DEFAULT_RETRY + + query = "select count(*) from persons" + configuration = { + "query": { + "query": query, + "useLegacySql": False, + "destinationTable": {"tableId": "table_id"}, + } + } + resource = { + "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, + "configuration": { + "query": { + "query": query, + "useLegacySql": False, + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "query_destination_table", + }, + } + }, + } + data_without_destination = { + "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, + "configuration": {"query": {"query": query, "useLegacySql": False}}, + } + + creds = _make_credentials() + http = object() + retry = DEFAULT_RETRY.with_deadline(1).with_predicate( + lambda exc: isinstance(exc, Forbidden) + ) + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + api_request_patcher = mock.patch.object( + client._connection, + "api_request", + side_effect=[ + Forbidden("", errors=[{"reason": "rateLimitExceeded"}]), + resource, + ], + ) + + with api_request_patcher as fake_api_request: + job = client.create_job(job_config=configuration, retry=retry) + + self.assertEqual(job.destination.table_id, "query_destination_table") + self.assertEqual(len(fake_api_request.call_args_list), 2) # was retried once + self.assertEqual( + fake_api_request.call_args_list[1], + mock.call( + method="POST", + path="/projects/PROJECT/jobs", + data=data_without_destination, + timeout=None, + ), + ) + def test_create_job_w_invalid_job_config(self): configuration = {"unknown": {}} creds = _make_credentials() From 583a0dd2d68b5cddf7896d7eb95f5ce9317bf86a Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Wed, 4 Mar 2020 19:03:43 +0530 Subject: [PATCH 3/3] feat(bigquery): make copy of job config for query job --- google/cloud/bigquery/client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 45b68ee09..6308e346d 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1370,11 +1370,12 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): source, destination_uris, job_config=extract_job_config, retry=retry ) elif "query" in job_config: - _del_sub_prop(job_config, ["query", "destinationTable"]) + copy_config = copy.deepcopy(job_config) + _del_sub_prop(copy_config, ["query", "destinationTable"]) query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( - job_config + copy_config ) - query = _get_sub_prop(job_config, ["query", "query"]) + query = _get_sub_prop(copy_config, ["query", "query"]) return self.query(query, job_config=query_job_config, retry=retry) else: raise TypeError("Invalid job configuration received.")