From 583ad56ed81f25536f2f1540e51c63dc077490c1 Mon Sep 17 00:00:00 2001 From: Hemang Date: Wed, 20 Nov 2019 17:12:40 +0530 Subject: [PATCH 1/3] feat(bigquery): add create job method --- bigquery/google/cloud/bigquery/client.py | 77 +++++++++++++++++++- bigquery/tests/unit/test_client.py | 89 ++++++++++++++++++++++++ 2 files changed, 165 insertions(+), 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index c8df21e91f55..cb121f12e117 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -74,7 +74,6 @@ from google.cloud.bigquery.table import TableReference from google.cloud.bigquery.table import RowIterator - _DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB _MAX_MULTIPART_SIZE = 5 * 1024 * 1024 _DEFAULT_NUM_RETRIES = 6 @@ -1127,6 +1126,82 @@ def job_from_resource(self, resource): return job.QueryJob.from_api_repr(resource, self) return job.UnknownJob.from_api_repr(resource, self) + def create_job( + self, job_config, source=None, destination=None, query=None, retry=DEFAULT_RETRY + ): + """Create a new job. + Arguments: + job_config (dict): configuration job representation returned from the API + + Keyword Arguments: + source (Union[ \ + google.cloud.bigquery.table.Table, \ + google.cloud.bigquery.table.TableReference, \ + str, \ + Sequence[str] + ]): + (Optional) URIs of data files to be loaded; in format + ``gs:///`` or Table + into which data is to be loaded. + + destination (Union[ \ + google.cloud.bigquery.table.Table, \ + google.cloud.bigquery.table.TableReference, \ + str, \ + ]): + (Optional) Table into which data is to be loaded. If a string is passed + in, this method attempts to create a table reference from a string using + :func:`google.cloud.bigquery.table.TableReference.from_string` or URIs of + Cloud Storage file(s) into which table data is to be extracted; in format + ``gs:///``. + + query (str): + (Optional) SQL query to be executed. Defaults to the standard SQL dialect. + Use the ``job_config`` parameter to change dialects. + + retry (google.api_core.retry.Retry): + (Optional) How to retry the RPC. + + Returns: + Union[ \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]: + A new job instance. + """ + + if "load" in job_config: + job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr( + job_config + ) + return self.load_table_from_uri( + source, destination, job_config=job_config, retry=retry + ) + elif "copy" in job_config: + job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( + job_config + ) + return self.copy_table( + source, destination, job_config=job_config, retry=retry + ) + elif "extract" in job_config: + job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( + job_config + ) + return self.extract_table( + source, destination, job_config=job_config, retry=retry + ) + elif "query" in job_config: + del job_config["query"]["destinationTable"] + job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( + job_config + ) + return self.query(query, job_config=job_config, retry=retry) + else: + raise TypeError("Invalid job configuration received.") + def get_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): """Fetch a job for the project associated with this client. diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index da3fb2c56689..34c1dcb3e6a4 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -46,6 +46,7 @@ from google.cloud import bigquery_v2 from google.cloud.bigquery.dataset import DatasetReference from tests.unit.helpers import make_connection +from google.cloud.bigquery.retry import DEFAULT_RETRY def _make_credentials(): @@ -2584,6 +2585,94 @@ def test_delete_table_w_not_found_ok_true(self): conn.api_request.assert_called_with(method="DELETE", path=path) + def _create_job_helper( + self, job_config, client_method, query=None, source=None, destination=None + ): + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + client._connection = make_connection() + rf1 = mock.Mock() + get_config_patch = mock.patch( + "google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1, + ) + load_patch = mock.patch(client_method, autospec=True) + with load_patch as client_method, get_config_patch: + client.create_job( + job_config=job_config, + source=source, + destination=destination, + query=query, + ) + if query: + client_method.assert_called_once_with( + client, query, job_config=rf1, retry=DEFAULT_RETRY + ) + else: + client_method.assert_called_once_with( + client, source, destination, job_config=rf1, retry=DEFAULT_RETRY + ) + + def test_create_job_load_config(self): + configuration = {"load": {"sourceUris": "http://example.com/source.csv"}} + self._create_job_helper( + configuration, + "google.cloud.bigquery.client.Client.load_table_from_uri", + source="http://example.com/source.csv", + destination="dataset_id", + ) + + def test_create_job_copy_config(self): + configuration = { + "copy": { + "sourceTables": "sourceTable", + "destinationTable": "destinationTable", + } + } + + self._create_job_helper( + configuration, + "google.cloud.bigquery.client.Client.copy_table", + source=mock.Mock(), + destination=mock.Mock(), + ) + + def test_create_job_extract_config(self): + configuration = { + "extract": { + "sourceTable": {"projectId": "project"}, + "destinationUris": ["destination"], + } + } + self._create_job_helper( + configuration, + "google.cloud.bigquery.client.Client.extract_table", + source=mock.Mock(), + destination=mock.Mock(), + ) + + def test_create_job_query_config(self): + configuration = { + "query": {"query": "query", "destinationTable": {"tableId": "table_id"}} + } + self._create_job_helper( + configuration, + "google.cloud.bigquery.client.Client.query", + query=mock.Mock(), + ) + + def test_create_job_w_invalid_job_config(self): + configuration = {"unknown": {}} + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + with self.assertRaises(TypeError) as exc: + client.create_job(job_config=configuration) + + self.assertIn("Invalid job configuration", exc.exception.args[0]) + def test_job_from_resource_unknown_type(self): from google.cloud.bigquery.job import UnknownJob From 28dbb775980c0a2a68866273f7d9fb4b875254b1 Mon Sep 17 00:00:00 2001 From: Hemang Date: Fri, 22 Nov 2019 11:05:12 +0530 Subject: [PATCH 2/3] feat(bigquery): cosmetic change --- bigquery/google/cloud/bigquery/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index cb121f12e117..269363dcfc71 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1131,7 +1131,7 @@ def create_job( ): """Create a new job. Arguments: - job_config (dict): configuration job representation returned from the API + job_config (dict): configuration job representation returned from the API. Keyword Arguments: source (Union[ \ @@ -1141,7 +1141,7 @@ def create_job( Sequence[str] ]): (Optional) URIs of data files to be loaded; in format - ``gs:///`` or Table + ``gs:///`` or Table into which data is to be loaded. destination (Union[ \ From d9e8015951ede30b6c8e339f72134fc1e793d7a0 Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Fri, 6 Dec 2019 15:01:53 +0530 Subject: [PATCH 3/3] feat(bigquery): remove the keyword arguments --- bigquery/google/cloud/bigquery/client.py | 64 +++++++--------- bigquery/tests/unit/test_client.py | 95 ++++++++++++++---------- 2 files changed, 85 insertions(+), 74 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 269363dcfc71..2d04ca30b839 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -51,6 +51,7 @@ from google.cloud import exceptions from google.cloud.client import ClientWithProject +from google.cloud.bigquery._helpers import _get_sub_prop from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._helpers import _verify_job_config_type @@ -1126,39 +1127,12 @@ def job_from_resource(self, resource): return job.QueryJob.from_api_repr(resource, self) return job.UnknownJob.from_api_repr(resource, self) - def create_job( - self, job_config, source=None, destination=None, query=None, retry=DEFAULT_RETRY - ): + def create_job(self, job_config, retry=DEFAULT_RETRY): """Create a new job. Arguments: job_config (dict): configuration job representation returned from the API. Keyword Arguments: - source (Union[ \ - google.cloud.bigquery.table.Table, \ - google.cloud.bigquery.table.TableReference, \ - str, \ - Sequence[str] - ]): - (Optional) URIs of data files to be loaded; in format - ``gs:///`` or Table - into which data is to be loaded. - - destination (Union[ \ - google.cloud.bigquery.table.Table, \ - google.cloud.bigquery.table.TableReference, \ - str, \ - ]): - (Optional) Table into which data is to be loaded. If a string is passed - in, this method attempts to create a table reference from a string using - :func:`google.cloud.bigquery.table.TableReference.from_string` or URIs of - Cloud Storage file(s) into which table data is to be extracted; in format - ``gs:///``. - - query (str): - (Optional) SQL query to be executed. Defaults to the standard SQL dialect. - Use the ``job_config`` parameter to change dialects. - retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. @@ -1173,32 +1147,50 @@ def create_job( """ if "load" in job_config: - job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr( + load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr( job_config ) + destination = TableReference.from_api_repr( + job_config["load"]["destinationTable"] + ) + source_uris = _get_sub_prop(job_config, ["load", "sourceUris"]) return self.load_table_from_uri( - source, destination, job_config=job_config, retry=retry + source_uris, destination, job_config=load_job_config, retry=retry ) elif "copy" in job_config: - job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( + copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( job_config ) + copy_resource = job_config["copy"] + destination = TableReference.from_api_repr( + copy_resource["destinationTable"] + ) + sources = [] + source_configs = copy_resource.get("sourceTables") + if source_configs is None: + source_configs = [copy_resource["sourceTable"]] + for source_config in source_configs: + table_ref = TableReference.from_api_repr(source_config) + sources.append(table_ref) return self.copy_table( - source, destination, job_config=job_config, retry=retry + sources, destination, job_config=copy_job_config, retry=retry ) elif "extract" in job_config: - job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( + extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( job_config ) + source = TableReference.from_api_repr(job_config["extract"]["sourceTable"]) + destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"]) return self.extract_table( - source, destination, job_config=job_config, retry=retry + source, destination_uris, job_config=extract_job_config, retry=retry ) elif "query" in job_config: del job_config["query"]["destinationTable"] - job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( + query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( job_config ) - return self.query(query, job_config=job_config, retry=retry) + query = _get_sub_prop(job_config, ["query", "query"]) + return self.query(query, job_config=query_job_config, retry=retry) else: raise TypeError("Invalid job configuration received.") diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 34c1dcb3e6a4..cef8d7d5bc35 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -46,7 +46,6 @@ from google.cloud import bigquery_v2 from google.cloud.bigquery.dataset import DatasetReference from tests.unit.helpers import make_connection -from google.cloud.bigquery.retry import DEFAULT_RETRY def _make_credentials(): @@ -2585,9 +2584,7 @@ def test_delete_table_w_not_found_ok_true(self): conn.api_request.assert_called_with(method="DELETE", path=path) - def _create_job_helper( - self, job_config, client_method, query=None, source=None, destination=None - ): + def _create_job_helper(self, job_config, client_method): creds = _make_credentials() http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) @@ -2598,58 +2595,82 @@ def _create_job_helper( "google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1, ) load_patch = mock.patch(client_method, autospec=True) + with load_patch as client_method, get_config_patch: - client.create_job( - job_config=job_config, - source=source, - destination=destination, - query=query, - ) - if query: - client_method.assert_called_once_with( - client, query, job_config=rf1, retry=DEFAULT_RETRY - ) - else: - client_method.assert_called_once_with( - client, source, destination, job_config=rf1, retry=DEFAULT_RETRY - ) + client.create_job(job_config=job_config) + client_method.assert_called_once() def test_create_job_load_config(self): - configuration = {"load": {"sourceUris": "http://example.com/source.csv"}} + configuration = { + "load": { + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "sourceUris": ["gs://test_bucket/src_object*"], + } + } + self._create_job_helper( - configuration, - "google.cloud.bigquery.client.Client.load_table_from_uri", - source="http://example.com/source.csv", - destination="dataset_id", + configuration, "google.cloud.bigquery.client.Client.load_table_from_uri" ) def test_create_job_copy_config(self): configuration = { "copy": { - "sourceTables": "sourceTable", - "destinationTable": "destinationTable", + "sourceTables": [ + { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + } + ], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "destination_table", + }, + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.copy_table", + ) + + def test_create_job_copy_config_w_single_source(self): + configuration = { + "copy": { + "sourceTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "destination_table", + }, } } self._create_job_helper( - configuration, - "google.cloud.bigquery.client.Client.copy_table", - source=mock.Mock(), - destination=mock.Mock(), + configuration, "google.cloud.bigquery.client.Client.copy_table", ) def test_create_job_extract_config(self): configuration = { "extract": { - "sourceTable": {"projectId": "project"}, - "destinationUris": ["destination"], + "sourceTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "destinationUris": ["gs://test_bucket/dst_object*"], } } self._create_job_helper( - configuration, - "google.cloud.bigquery.client.Client.extract_table", - source=mock.Mock(), - destination=mock.Mock(), + configuration, "google.cloud.bigquery.client.Client.extract_table", ) def test_create_job_query_config(self): @@ -2657,9 +2678,7 @@ def test_create_job_query_config(self): "query": {"query": "query", "destinationTable": {"tableId": "table_id"}} } self._create_job_helper( - configuration, - "google.cloud.bigquery.client.Client.query", - query=mock.Mock(), + configuration, "google.cloud.bigquery.client.Client.query", ) def test_create_job_w_invalid_job_config(self):