From b901cf24cb12055ab8042974ff6edc02c71490c8 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 16 Aug 2019 16:02:22 -0700 Subject: [PATCH 1/4] Determine the schema in `load_table_from_dataframe` based on dtypes. This PR updates `load_table_from_dataframe` to automatically determine the BigQuery schema based on the DataFrame's dtypes. If any field's type cannot be determined, fallback to the logic in the pandas `to_parquet` method. --- .../google/cloud/bigquery/_pandas_helpers.py | 40 ++++++++ bigquery/google/cloud/bigquery/client.py | 9 ++ bigquery/tests/system.py | 94 +++++++++++++++++++ 3 files changed, 143 insertions(+) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 5cc69e434b04..db7f36f3d93e 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -49,6 +49,21 @@ _PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds. +_PANDAS_DTYPE_TO_BQ = { + "bool": "BOOLEAN", + "datetime64[ns, UTC]": "TIMESTAMP", + "datetime64[ns]": "DATETIME", + "float32": "FLOAT", + "float64": "FLOAT", + "int8": "INTEGER", + "int16": "INTEGER", + "int32": "INTEGER", + "int64": "INTEGER", + "uint8": "INTEGER", + "uint16": "INTEGER", + "uint32": "INTEGER", +} + class _DownloadState(object): """Flag to indicate that a thread should exit early.""" @@ -172,6 +187,31 @@ def bq_to_arrow_array(series, bq_field): return pyarrow.array(series, type=arrow_type) +def dataframe_to_bq_schema(dataframe): + """Convert a pandas DataFrame schema to a BigQuery schema. + + TODO(GH#8140): Add bq_schema argument to allow overriding autodetected + schema for a subset of columns. + + Args: + dataframe (pandas.DataFrame): + DataFrame to convert to convert to Parquet file. + + Returns: + Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]: + The automatically determined schema. Returns None if the type of + any column cannot be determined. + """ + bq_schema = [] + for column, dtype in zip(dataframe.columns, dataframe.dtypes): + bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) + if not bq_type: + return None + bq_field = schema.SchemaField(column, bq_type) + bq_schema.append(bq_field) + return tuple(bq_schema) + + def dataframe_to_arrow(dataframe, bq_schema): """Convert pandas dataframe to Arrow table, using BigQuery schema. diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 04c596975eec..44b9cae87d5c 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -21,6 +21,7 @@ except ImportError: # Python 2.7 import collections as collections_abc +import copy import functools import gzip import io @@ -1520,11 +1521,19 @@ def load_table_from_dataframe( if job_config is None: job_config = job.LoadJobConfig() + else: + # Make a copy so that the job config isn't modified in-place. + job_config_properties = copy.deepcopy(job_config._properties) + job_config = job.LoadJobConfig() + job_config._properties = job_config_properties job_config.source_format = job.SourceFormat.PARQUET if location is None: location = self.location + if not job_config.schema: + job_config.schema = _pandas_helpers.dataframe_to_bq_schema(dataframe) + tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8])) os.close(tmpfd) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index fd9efa7752cf..6fbb96b93311 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -634,6 +634,100 @@ def test_load_table_from_local_avro_file_then_dump_table(self): sorted(row_tuples, key=by_wavelength), sorted(ROWS, key=by_wavelength) ) + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_automatic_schema(self): + """Test that a DataFrame with dtypes that map well to BigQuery types + can be uploaded without specifying a schema. + + https://github.com/googleapis/google-cloud-python/issues/9044 + """ + bool_col = pandas.Series([True, False, True], dtype="bool") + ts_col = pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ).dt.tz_localize(pytz.utc) + dt_col = pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ) + float32_col = pandas.Series([1.0, 2.0, 3.0], dtype="float32") + float64_col = pandas.Series([4.0, 5.0, 6.0], dtype="float64") + int8_col = pandas.Series([-12, -11, -10], dtype="int8") + int16_col = pandas.Series([-9, -8, -7], dtype="int16") + int32_col = pandas.Series([-6, -5, -4], dtype="int32") + int64_col = pandas.Series([-3, -2, -1], dtype="int64") + uint8_col = pandas.Series([0, 1, 2], dtype="uint8") + uint16_col = pandas.Series([3, 4, 5], dtype="uint16") + uint32_col = pandas.Series([6, 7, 8], dtype="uint32") + dataframe = pandas.DataFrame( + { + "bool_col": bool_col, + "ts_col": ts_col, + "dt_col": dt_col, + "float32_col": float32_col, + "float64_col": float64_col, + "int8_col": int8_col, + "int16_col": int16_col, + "int32_col": int32_col, + "int64_col": int64_col, + "uint8_col": uint8_col, + "uint16_col": uint16_col, + "uint32_col": uint32_col, + }, + columns=[ + "bool_col", + "ts_col", + "dt_col", + "float32_col", + "float64_col", + "int8_col", + "int16_col", + "int32_col", + "int64_col", + "uint8_col", + "uint16_col", + "uint32_col", + ], + ) + + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_automatic_schema".format( + Config.CLIENT.project, dataset_id + ) + + load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id) + load_job.result() + + table = Config.CLIENT.get_table(table_id) + self.assertEqual( + tuple(table.schema), + ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float32_col", "FLOAT"), + bigquery.SchemaField("float64_col", "FLOAT"), + bigquery.SchemaField("int8_col", "INTEGER"), + bigquery.SchemaField("int16_col", "INTEGER"), + bigquery.SchemaField("int32_col", "INTEGER"), + bigquery.SchemaField("int64_col", "INTEGER"), + bigquery.SchemaField("uint8_col", "INTEGER"), + bigquery.SchemaField("uint16_col", "INTEGER"), + bigquery.SchemaField("uint32_col", "INTEGER"), + ), + ) + self.assertEqual(table.num_rows, 3) + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_nulls(self): From a6126b7e18a82d1dca1178243d302422e5634331 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 20 Aug 2019 14:41:43 -0700 Subject: [PATCH 2/4] Fix test coverage. --- bigquery/google/cloud/bigquery/client.py | 8 ++- bigquery/tests/unit/test_client.py | 69 +++++++++++++++++++++++- 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 44b9cae87d5c..56aa7a7a61e8 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1532,7 +1532,13 @@ def load_table_from_dataframe( location = self.location if not job_config.schema: - job_config.schema = _pandas_helpers.dataframe_to_bq_schema(dataframe) + autodetected_schema = _pandas_helpers.dataframe_to_bq_schema(dataframe) + + # Only use an explicit schema if we were able to determine one + # matching the dataframe. If not, fallback to the pandas to_parquet + # method. + if autodetected_schema: + job_config.schema = autodetected_schema tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8])) os.close(tmpfd) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index c4e9c5e830ac..6ad8f33de0d4 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5325,9 +5325,74 @@ def test_load_table_from_dataframe_w_custom_job_config(self): ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] - assert sent_config is job_config assert sent_config.source_format == job.SourceFormat.PARQUET + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_automatic_schema(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + dt_col = pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ) + ts_col = pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ).dt.tz_localize(pytz.utc) + df_data = { + "int_col": [1, 2, 3], + "float_col": [1.0, 2.0, 3.0], + "bool_col": [True, False, True], + "dt_col": dt_col, + "ts_col": ts_col, + } + dataframe = pandas.DataFrame( + df_data, columns=["int_col", "float_col", "bool_col", "dt_col", "ts_col"] + ) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + with load_patch as load_table_from_file: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.PARQUET + assert tuple(sent_config.schema) == ( + SchemaField("int_col", "INTEGER"), + SchemaField("float_col", "FLOAT"), + SchemaField("bool_col", "BOOLEAN"), + SchemaField("dt_col", "DATETIME"), + SchemaField("ts_col", "TIMESTAMP"), + ) + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_schema_wo_pyarrow(self): @@ -5475,7 +5540,7 @@ def test_load_table_from_dataframe_w_nulls(self): ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] - assert sent_config is job_config + assert sent_config.schema == schema assert sent_config.source_format == job.SourceFormat.PARQUET # Low-level tests From caa0f1fd2b3ed42151739557889f71b04f385e20 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 21 Aug 2019 10:40:26 -0700 Subject: [PATCH 3/4] Reduce duplication by using OrderedDict --- bigquery/tests/system.py | 92 ++++++++++++------------------ bigquery/tests/unit/test_client.py | 53 +++++++++-------- 2 files changed, 66 insertions(+), 79 deletions(-) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 6fbb96b93311..0e9d7bb67cd7 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -13,6 +13,7 @@ # limitations under the License. import base64 +import collections import concurrent.futures import csv import datetime @@ -642,62 +643,43 @@ def test_load_table_from_dataframe_w_automatic_schema(self): https://github.com/googleapis/google-cloud-python/issues/9044 """ - bool_col = pandas.Series([True, False, True], dtype="bool") - ts_col = pandas.Series( + df_data = collections.OrderedDict( [ - datetime.datetime(2010, 1, 2, 3, 44, 50), - datetime.datetime(2011, 2, 3, 14, 50, 59), - datetime.datetime(2012, 3, 14, 15, 16), - ], - dtype="datetime64[ns]", - ).dt.tz_localize(pytz.utc) - dt_col = pandas.Series( - [ - datetime.datetime(2010, 1, 2, 3, 44, 50), - datetime.datetime(2011, 2, 3, 14, 50, 59), - datetime.datetime(2012, 3, 14, 15, 16), - ], - dtype="datetime64[ns]", - ) - float32_col = pandas.Series([1.0, 2.0, 3.0], dtype="float32") - float64_col = pandas.Series([4.0, 5.0, 6.0], dtype="float64") - int8_col = pandas.Series([-12, -11, -10], dtype="int8") - int16_col = pandas.Series([-9, -8, -7], dtype="int16") - int32_col = pandas.Series([-6, -5, -4], dtype="int32") - int64_col = pandas.Series([-3, -2, -1], dtype="int64") - uint8_col = pandas.Series([0, 1, 2], dtype="uint8") - uint16_col = pandas.Series([3, 4, 5], dtype="uint16") - uint32_col = pandas.Series([6, 7, 8], dtype="uint32") - dataframe = pandas.DataFrame( - { - "bool_col": bool_col, - "ts_col": ts_col, - "dt_col": dt_col, - "float32_col": float32_col, - "float64_col": float64_col, - "int8_col": int8_col, - "int16_col": int16_col, - "int32_col": int32_col, - "int64_col": int64_col, - "uint8_col": uint8_col, - "uint16_col": uint16_col, - "uint32_col": uint32_col, - }, - columns=[ - "bool_col", - "ts_col", - "dt_col", - "float32_col", - "float64_col", - "int8_col", - "int16_col", - "int32_col", - "int64_col", - "uint8_col", - "uint16_col", - "uint32_col", - ], - ) + ("bool_col", pandas.Series([True, False, True], dtype="bool")), + ( + "ts_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ).dt.tz_localize(pytz.utc), + ), + ( + "dt_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ), + ), + ("float32_col", pandas.Series([1.0, 2.0, 3.0], dtype="float32")), + ("float64_col", pandas.Series([4.0, 5.0, 6.0], dtype="float64")), + ("int8_col", pandas.Series([-12, -11, -10], dtype="int8")), + ("int16_col", pandas.Series([-9, -8, -7], dtype="int16")), + ("int32_col", pandas.Series([-6, -5, -4], dtype="int32")), + ("int64_col", pandas.Series([-3, -2, -1], dtype="int64")), + ("uint8_col", pandas.Series([0, 1, 2], dtype="uint8")), + ("uint16_col", pandas.Series([3, 4, 5], dtype="uint16")), + ("uint32_col", pandas.Series([6, 7, 8], dtype="uint32")), + ] + ) + dataframe = pandas.DataFrame(df_data) dataset_id = _make_dataset_id("bq_load_test") self.temp_dataset(dataset_id) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 86fdfec469b2..c59fe80eb480 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -13,6 +13,7 @@ # limitations under the License. import copy +import collections import datetime import decimal import email @@ -5335,32 +5336,36 @@ def test_load_table_from_dataframe_w_automatic_schema(self): from google.cloud.bigquery.schema import SchemaField client = self._make_client() - dt_col = pandas.Series( + df_data = collections.OrderedDict( [ - datetime.datetime(2010, 1, 2, 3, 44, 50), - datetime.datetime(2011, 2, 3, 14, 50, 59), - datetime.datetime(2012, 3, 14, 15, 16), - ], - dtype="datetime64[ns]", - ) - ts_col = pandas.Series( - [ - datetime.datetime(2010, 1, 2, 3, 44, 50), - datetime.datetime(2011, 2, 3, 14, 50, 59), - datetime.datetime(2012, 3, 14, 15, 16), - ], - dtype="datetime64[ns]", - ).dt.tz_localize(pytz.utc) - df_data = { - "int_col": [1, 2, 3], - "float_col": [1.0, 2.0, 3.0], - "bool_col": [True, False, True], - "dt_col": dt_col, - "ts_col": ts_col, - } - dataframe = pandas.DataFrame( - df_data, columns=["int_col", "float_col", "bool_col", "dt_col", "ts_col"] + ("int_col", [1, 2, 3]), + ("float_col", [1.0, 2.0, 3.0]), + ("bool_col", [True, False, True]), + ( + "dt_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ), + ), + ( + "ts_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ).dt.tz_localize(pytz.utc), + ), + ] ) + dataframe = pandas.DataFrame(df_data) load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True ) From ab222f03a06b1cb3259c59e2a83f795354b68bad Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 21 Aug 2019 10:44:37 -0700 Subject: [PATCH 4/4] Add columns option to DataFrame constructor to ensure correct column order. --- bigquery/tests/system.py | 2 +- bigquery/tests/unit/test_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 0e9d7bb67cd7..59a72297ed87 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -679,7 +679,7 @@ def test_load_table_from_dataframe_w_automatic_schema(self): ("uint32_col", pandas.Series([6, 7, 8], dtype="uint32")), ] ) - dataframe = pandas.DataFrame(df_data) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) dataset_id = _make_dataset_id("bq_load_test") self.temp_dataset(dataset_id) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index c59fe80eb480..8a2a1228cd65 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5365,7 +5365,7 @@ def test_load_table_from_dataframe_w_automatic_schema(self): ), ] ) - dataframe = pandas.DataFrame(df_data) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True )