From d195326b484cd53cfd150bdac649e542717111ae Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 12 Nov 2019 14:52:58 +0200 Subject: [PATCH] fix(bigquery): preserve job config passed to Client methods This commit assures that Client's methods that accept job config as an argument operate on deep copies, and do not modify the original job config instances passed to them. --- bigquery/google/cloud/bigquery/client.py | 12 +- bigquery/tests/unit/test_client.py | 135 ++++++++++++++++++++++- 2 files changed, 139 insertions(+), 8 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index bae4359300f8..c8df21e91f55 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -187,7 +187,7 @@ def __init__( self._connection = Connection(self, **kw_args) self._location = location - self._default_query_job_config = default_query_job_config + self._default_query_job_config = copy.deepcopy(default_query_job_config) @property def location(self): @@ -1381,6 +1381,7 @@ def load_table_from_uri( destination = _table_arg_to_table_ref(destination, default_project=self.project) if job_config: + job_config = copy.deepcopy(job_config) _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config) @@ -1465,6 +1466,7 @@ def load_table_from_file( destination = _table_arg_to_table_ref(destination, default_project=self.project) job_ref = job._JobReference(job_id, project=project, location=location) if job_config: + job_config = copy.deepcopy(job_config) _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) load_job = job.LoadJob(job_ref, None, destination, self, job_config) job_resource = load_job.to_api_repr() @@ -1969,6 +1971,8 @@ def copy_table( if job_config: _verify_job_config_type(job_config, google.cloud.bigquery.job.CopyJobConfig) + job_config = copy.deepcopy(job_config) + copy_job = job.CopyJob( job_ref, sources, destination, client=self, job_config=job_config ) @@ -2049,6 +2053,8 @@ def extract_table( _verify_job_config_type( job_config, google.cloud.bigquery.job.ExtractJobConfig ) + job_config = copy.deepcopy(job_config) + extract_job = job.ExtractJob( job_ref, source, destination_uris, client=self, job_config=job_config ) @@ -2112,6 +2118,8 @@ def query( if location is None: location = self.location + job_config = copy.deepcopy(job_config) + if self._default_query_job_config: if job_config: _verify_job_config_type( @@ -2129,7 +2137,7 @@ def query( self._default_query_job_config, google.cloud.bigquery.job.QueryJobConfig, ) - job_config = self._default_query_job_config + job_config = copy.deepcopy(self._default_query_job_config) job_ref = job._JobReference(job_id, project=project, location=location) query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index e6ed4d1c8072..da3fb2c56689 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -2997,6 +2997,8 @@ def test_load_table_from_uri(self): creds = _make_credentials() http = object() job_config = LoadJobConfig() + original_config_copy = copy.deepcopy(job_config) + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) conn = client._connection = make_connection(RESOURCE) destination = client.dataset(self.DS_ID).table(DESTINATION) @@ -3010,6 +3012,9 @@ def test_load_table_from_uri(self): method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE ) + # the original config object should not have been modified + self.assertEqual(job_config.to_api_repr(), original_config_copy.to_api_repr()) + self.assertIsInstance(job, LoadJob) self.assertIsInstance(job._configuration, LoadJobConfig) self.assertIs(job._client, client) @@ -3496,19 +3501,24 @@ def test_copy_table_w_valid_job_config(self): creds = _make_credentials() http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) - job_config = CopyJobConfig() conn = client._connection = make_connection(RESOURCE) dataset = client.dataset(self.DS_ID) source = dataset.table(SOURCE) destination = dataset.table(DESTINATION) + job_config = CopyJobConfig() + original_config_copy = copy.deepcopy(job_config) job = client.copy_table(source, destination, job_id=JOB, job_config=job_config) + # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE ) self.assertIsInstance(job._configuration, CopyJobConfig) + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + def test_extract_table(self): from google.cloud.bigquery.job import ExtractJob @@ -3679,6 +3689,7 @@ def test_extract_table_generated_job_id(self): source = dataset.table(SOURCE) job_config = ExtractJobConfig() job_config.destination_format = DestinationFormat.NEWLINE_DELIMITED_JSON + original_config_copy = copy.deepcopy(job_config) job = client.extract_table(source, DESTINATION, job_config=job_config) @@ -3695,6 +3706,9 @@ def test_extract_table_generated_job_id(self): self.assertEqual(job.source, source) self.assertEqual(list(job.destination_uris), [DESTINATION]) + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + def test_extract_table_w_destination_uris(self): from google.cloud.bigquery.job import ExtractJob @@ -3840,6 +3854,7 @@ def test_query_w_explicit_job_config(self): job_config = QueryJobConfig() job_config.use_query_cache = True job_config.maximum_bytes_billed = 2000 + original_config_copy = copy.deepcopy(job_config) client.query( query, job_id=job_id, location=self.LOCATION, job_config=job_config @@ -3850,6 +3865,105 @@ def test_query_w_explicit_job_config(self): method="POST", path="/projects/PROJECT/jobs", data=resource ) + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + + def test_query_preserving_explicit_job_config(self): + job_id = "some-job-id" + query = "select count(*) from persons" + resource = { + "jobReference": { + "jobId": job_id, + "projectId": self.PROJECT, + "location": self.LOCATION, + }, + "configuration": { + "query": { + "query": query, + "useLegacySql": False, + "useQueryCache": True, + "maximumBytesBilled": "2000", + } + }, + } + + creds = _make_credentials() + http = object() + + from google.cloud.bigquery import QueryJobConfig + + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http,) + conn = client._connection = make_connection(resource) + + job_config = QueryJobConfig() + job_config.use_query_cache = True + job_config.maximum_bytes_billed = 2000 + original_config_copy = copy.deepcopy(job_config) + + client.query( + query, job_id=job_id, location=self.LOCATION, job_config=job_config + ) + + # Check that query actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", path="/projects/PROJECT/jobs", data=resource + ) + + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + + def test_query_preserving_explicit_default_job_config(self): + job_id = "some-job-id" + query = "select count(*) from persons" + resource = { + "jobReference": { + "jobId": job_id, + "projectId": self.PROJECT, + "location": self.LOCATION, + }, + "configuration": { + "query": { + "query": query, + "defaultDataset": { + "projectId": self.PROJECT, + "datasetId": "some-dataset", + }, + "useLegacySql": False, + "maximumBytesBilled": "1000", + } + }, + } + + creds = _make_credentials() + http = object() + + from google.cloud.bigquery import QueryJobConfig, DatasetReference + + default_job_config = QueryJobConfig() + default_job_config.default_dataset = DatasetReference( + self.PROJECT, "some-dataset" + ) + default_job_config.maximum_bytes_billed = 1000 + default_config_copy = copy.deepcopy(default_job_config) + + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + default_query_job_config=default_job_config, + ) + conn = client._connection = make_connection(resource) + + client.query(query, job_id=job_id, location=self.LOCATION, job_config=None) + + # Check that query actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", path="/projects/PROJECT/jobs", data=resource + ) + + # the original default config object should not have been modified + assert default_job_config.to_api_repr() == default_config_copy.to_api_repr() + def test_query_w_invalid_job_config(self): from google.cloud.bigquery import QueryJobConfig, DatasetReference from google.cloud.bigquery import job @@ -5429,22 +5543,24 @@ def test_load_table_from_file_resumable(self): client = self._make_client() file_obj = self._make_file_obj() + job_config = self._make_config() + original_config_copy = copy.deepcopy(job_config) do_upload_patch = self._make_do_upload_patch( client, "_do_resumable_upload", self.EXPECTED_CONFIGURATION ) with do_upload_patch as do_upload: client.load_table_from_file( - file_obj, - self.TABLE_REF, - job_id="job_id", - job_config=self._make_config(), + file_obj, self.TABLE_REF, job_id="job_id", job_config=job_config, ) do_upload.assert_called_once_with( file_obj, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES ) + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + def test_load_table_from_file_w_explicit_project(self): from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES @@ -5790,6 +5906,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self): job_config = job.LoadJobConfig( write_disposition=job.WriteDisposition.WRITE_TRUNCATE ) + original_config_copy = copy.deepcopy(job_config) get_table_patch = mock.patch( "google.cloud.bigquery.client.Client.get_table", @@ -5826,6 +5943,9 @@ def test_load_table_from_dataframe_w_custom_job_config(self): assert sent_config.source_format == job.SourceFormat.PARQUET assert sent_config.write_disposition == job.WriteDisposition.WRITE_TRUNCATE + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_automatic_schema(self): @@ -6466,6 +6586,7 @@ def test_load_table_from_json_non_default_args(self): ] job_config = job.LoadJobConfig(schema=schema) job_config._properties["load"]["unknown_field"] = "foobar" + original_config_copy = copy.deepcopy(job_config) load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True @@ -6493,13 +6614,15 @@ def test_load_table_from_json_non_default_args(self): ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] - assert job_config.source_format is None # the original was not modified assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON assert sent_config.schema == schema assert not sent_config.autodetect # all properties should have been cloned and sent to the backend assert sent_config._properties.get("load", {}).get("unknown_field") == "foobar" + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + def test_load_table_from_json_w_invalid_job_config(self): from google.cloud.bigquery import job