diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 34dceaeecd4a..d2b32facd2a2 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -53,6 +53,7 @@ from google.cloud import exceptions from google.cloud.client import ClientWithProject +from google.cloud.bigquery._helpers import _get_sub_prop from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._helpers import _verify_job_config_type @@ -76,7 +77,6 @@ from google.cloud.bigquery.table import TableReference from google.cloud.bigquery.table import RowIterator - _DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB _MAX_MULTIPART_SIZE = 5 * 1024 * 1024 _DEFAULT_NUM_RETRIES = 6 @@ -1294,9 +1294,77 @@ def job_from_resource(self, resource): return job.QueryJob.from_api_repr(resource, self) return job.UnknownJob.from_api_repr(resource, self) + def create_job(self, job_config, retry=DEFAULT_RETRY): + """Create a new job. + Arguments: + job_config (dict): configuration job representation returned from the API. + + Keyword Arguments: + retry (google.api_core.retry.Retry): + (Optional) How to retry the RPC. + + Returns: + Union[ \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]: + A new job instance. + """ + + if "load" in job_config: + load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr( + job_config + ) + destination = TableReference.from_api_repr( + job_config["load"]["destinationTable"] + ) + source_uris = _get_sub_prop(job_config, ["load", "sourceUris"]) + return self.load_table_from_uri( + source_uris, destination, job_config=load_job_config, retry=retry + ) + elif "copy" in job_config: + copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( + job_config + ) + copy_resource = job_config["copy"] + destination = TableReference.from_api_repr( + copy_resource["destinationTable"] + ) + sources = [] + source_configs = copy_resource.get("sourceTables") + if source_configs is None: + source_configs = [copy_resource["sourceTable"]] + for source_config in source_configs: + table_ref = TableReference.from_api_repr(source_config) + sources.append(table_ref) + return self.copy_table( + sources, destination, job_config=copy_job_config, retry=retry + ) + elif "extract" in job_config: + extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( + job_config + ) + source = TableReference.from_api_repr(job_config["extract"]["sourceTable"]) + destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"]) + return self.extract_table( + source, destination_uris, job_config=extract_job_config, retry=retry + ) + elif "query" in job_config: + del job_config["query"]["destinationTable"] + query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( + job_config + ) + query = _get_sub_prop(job_config, ["query", "query"]) + return self.query(query, job_config=query_job_config, retry=retry) + else: + raise TypeError("Invalid job configuration received.") + def get_job( self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None ): + """Fetch a job for the project associated with this client. See diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index b87ea52a057d..935aa3bdb40e 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -2766,6 +2766,114 @@ def test_delete_table_w_not_found_ok_true(self): conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) + def _create_job_helper(self, job_config, client_method): + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + client._connection = make_connection() + rf1 = mock.Mock() + get_config_patch = mock.patch( + "google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1, + ) + load_patch = mock.patch(client_method, autospec=True) + + with load_patch as client_method, get_config_patch: + client.create_job(job_config=job_config) + client_method.assert_called_once() + + def test_create_job_load_config(self): + configuration = { + "load": { + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "sourceUris": ["gs://test_bucket/src_object*"], + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.load_table_from_uri" + ) + + def test_create_job_copy_config(self): + configuration = { + "copy": { + "sourceTables": [ + { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + } + ], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "destination_table", + }, + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.copy_table", + ) + + def test_create_job_copy_config_w_single_source(self): + configuration = { + "copy": { + "sourceTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "destination_table", + }, + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.copy_table", + ) + + def test_create_job_extract_config(self): + configuration = { + "extract": { + "sourceTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "destinationUris": ["gs://test_bucket/dst_object*"], + } + } + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.extract_table", + ) + + def test_create_job_query_config(self): + configuration = { + "query": {"query": "query", "destinationTable": {"tableId": "table_id"}} + } + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.query", + ) + + def test_create_job_w_invalid_job_config(self): + configuration = {"unknown": {}} + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + with self.assertRaises(TypeError) as exc: + client.create_job(job_config=configuration) + + self.assertIn("Invalid job configuration", exc.exception.args[0]) + def test_job_from_resource_unknown_type(self): from google.cloud.bigquery.job import UnknownJob