From d6c2ab5e443f65a7d6f305757c7943268d1b365d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 20 May 2019 14:11:52 -0700 Subject: [PATCH 1/6] Use `job_config.schema` for data type conversion if specified in `load_table_from_dataframe`. Use the BigQuery schema to inform encoding of file used in load job. This fixes an issue where a dataframe with ambiguous types (such as an `object` column containing all `None` values) could not be appended to an existing table, since the schemas wouldn't match in most cases. --- .../google/cloud/bigquery/_pandas_helpers.py | 115 ++++++++++++++++++ bigquery/google/cloud/bigquery/client.py | 12 +- bigquery/tests/system.py | 65 ++++++++++ bigquery/tests/unit/test__pandas_helpers.py | 112 +++++++++++++++++ 4 files changed, 303 insertions(+), 1 deletion(-) create mode 100644 bigquery/google/cloud/bigquery/_pandas_helpers.py create mode 100644 bigquery/tests/unit/test__pandas_helpers.py diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py new file mode 100644 index 000000000000..52ac42ffa682 --- /dev/null +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -0,0 +1,115 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared helper functions for connecting BigQuery and pandas.""" + +try: + import pyarrow + import pyarrow.parquet +except ImportError: # pragma: NO COVER + pyarrow = None +import six.moves + + +def pyarrow_datetime(): + return pyarrow.timestamp("us", tz=None) + + +def pyarrow_numeric(): + return pyarrow.decimal128(38, 9) + + +def pyarrow_time(): + return pyarrow.time64("us") + + +def pyarrow_timestamp(): + return pyarrow.timestamp("us", tz="UTC") + + +BQ_TO_ARROW_SCALARS = {} +if pyarrow is not None: + BQ_TO_ARROW_SCALARS = { + "BOOL": pyarrow.bool_, + "BOOLEAN": pyarrow.bool_, + "BYTES": pyarrow.binary, + "DATE": pyarrow.date32, + "DATETIME": pyarrow_datetime, + "FLOAT": pyarrow.float64, + "FLOAT64": pyarrow.float64, + "GEOGRAPHY": pyarrow.string, + "INT64": pyarrow.int64, + "INTEGER": pyarrow.int64, + "NUMERIC": pyarrow_numeric, + "STRING": pyarrow.string, + "TIME": pyarrow_time, + "TIMESTAMP": pyarrow_timestamp, + } + + +def bq_to_arrow_data_type(field): + """Return the Arrow data type, corresponding to a given BigQuery column. + + Returns None if default Arrow type inspection should be used. + """ + # TODO: Use pyarrow.list_(item_type) for repeated (array) fields. + if field.mode is not None and field.mode.upper() == "REPEATED": + return None + # TODO: Use pyarrow.struct(fields) for record (struct) fields. + if field.field_type.upper() in ("RECORD", "STRUCT"): + return None + + data_type_constructor = BQ_TO_ARROW_SCALARS.get(field.field_type.upper()) + if data_type_constructor is None: + return None + return data_type_constructor() + + +def to_parquet(dataframe, bq_schema, filepath): + """Write dataframe as a Parquet file, according to the desired BQ schema. + + This function requires the :mod:`pyarrow` package. Arrow is used as an + intermediate format. + + Args: + dataframe (pandas.DataFrame): + DataFrame to convert to convert to Parquet file. + bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + Desired BigQuery schema. Number of columns must match number of + columns in the DataFrame. + filepath (str): + Path to write Parquet file to. + """ + if pyarrow is None: + raise ValueError("pyarrow is required for BigQuery schema conversion") + + if len(bq_schema) != len(dataframe.columns): + # TODO: match names, too. + raise ValueError( + "Number of columns in schema must match number of columns in dataframe" + ) + + arrow_arrays = [] + column_names = [] + for bq_field in bq_schema: + column_names.append(bq_field.name) + arrow_arrays.append( + pyarrow.array( + dataframe[bq_field.name], type=bq_to_arrow_data_type(bq_field) + ) + ) + + # TODO: make pyarrow table and write to parquet. + arrow_table = pyarrow.Table.from_arrays(arrow_arrays, names=column_names) + pyarrow.parquet.write_table(arrow_table, filepath) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index f61c18f11bd4..2517639d6d8e 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -44,6 +44,7 @@ from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._http import Connection +from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetListItem from google.cloud.bigquery.dataset import DatasetReference @@ -1274,6 +1275,12 @@ def load_table_from_dataframe( job_config (google.cloud.bigquery.job.LoadJobConfig, optional): Extra configuration options for the job. + To override the default pandas data type conversions, supply + a BigQuery schema in the job configuration. If a schema is + supplied, the BigQuery schema will be used to determine the + correct pandas to parquet type conversion. Indexes are not + loaded. Requires the :mod:`pyarrow` library. + Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -1296,7 +1303,10 @@ def load_table_from_dataframe( os.close(tmpfd) try: - dataframe.to_parquet(tmppath) + if job_config.schema: + _pandas_helpers.to_parquet(dataframe, job_config.schema, tmppath) + else: + dataframe.to_parquet(tmppath) with open(tmppath, "rb") as parquet_file: return self.load_table_from_file( diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index cceca192b8f7..77b44ff5e22a 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -36,6 +36,10 @@ import pandas except ImportError: # pragma: NO COVER pandas = None +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None try: import IPython from IPython.utils import io @@ -622,6 +626,67 @@ def test_load_table_from_local_avro_file_then_dump_table(self): sorted(row_tuples, key=by_wavelength), sorted(ROWS, key=by_wavelength) ) + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_nulls(self): + """Test that a DataFrame with null columns can be uploaded if a + BigQuery schema is specified. + + See: https://github.com/googleapis/google-cloud-python/issues/7370 + """ + # TODO: make table with certain schema. Try to load / append to that table with a bunch of null columns. + # Schema with all scalar types. + table_schema = ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("bytes_col", "BYTES"), + bigquery.SchemaField("date_col", "DATE"), + bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float_col", "FLOAT"), + bigquery.SchemaField("geo_col", "GEOGRAPHY"), + bigquery.SchemaField("int_col", "INTEGER"), + bigquery.SchemaField("num_col", "NUMERIC"), + bigquery.SchemaField("str_col", "STRING"), + bigquery.SchemaField("time_col", "TIME"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + ) + num_rows = 100 + nulls = [None] * num_rows + dataframe = pandas.DataFrame( + { + "bool_col": nulls, + "bytes_col": nulls, + "date_col": nulls, + "dt_col": nulls, + "float_col": nulls, + "geo_col": nulls, + "int_col": nulls, + "num_col": nulls, + "str_col": nulls, + "time_col": nulls, + "ts_col": nulls, + } + ) + + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_nulls".format( + Config.CLIENT.project, dataset_id + ) + table = retry_403(Config.CLIENT.create_table)( + Table(table_id, schema=table_schema) + ) + self.to_delete.insert(0, table) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = Config.CLIENT.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, num_rows) + def test_load_avro_from_uri_then_dump_table(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import SourceFormat diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py new file mode 100644 index 000000000000..8478d7871644 --- /dev/null +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -0,0 +1,112 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pyarrow.types +import pytest + +from google.cloud.bigquery import schema + + +@pytest.fixture +def module_under_test(): + from google.cloud.bigquery import _pandas_helpers + + return _pandas_helpers + + +def is_none(value): + return value is None + + +def is_numeric(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type + return all_( + pyarrow.types.is_decimal, + lambda type_: type_.precision == 38, + lambda type_: type_.scale == 9, + ) + + +def is_timestamp(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp-type + return all_( + pyarrow.types.is_timestamp, + lambda type_: type_.unit == "us", + lambda type_: type_.tz == "UTC", + ) + + +def is_datetime(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime-type + return all_( + pyarrow.types.is_timestamp, + lambda type_: type_.unit == "us", + lambda type_: type_.tz is None, + ) + + +def all_(*functions): + def do_all(value): + return all((func(value) for func in functions)) + + return do_all + + +@pytest.mark.parametrize( + "bq_type,bq_mode,is_correct_type", + [ + ("STRING", "NULLABLE", pyarrow.types.is_string), + ("STRING", None, pyarrow.types.is_string), + ("string", "NULLABLE", pyarrow.types.is_string), + ("StRiNg", "NULLABLE", pyarrow.types.is_string), + ("BYTES", "NULLABLE", pyarrow.types.is_binary), + ("INTEGER", "NULLABLE", pyarrow.types.is_int64), + ("INT64", "NULLABLE", pyarrow.types.is_int64), + ("FLOAT", "NULLABLE", pyarrow.types.is_float64), + ("FLOAT64", "NULLABLE", pyarrow.types.is_float64), + ("NUMERIC", "NULLABLE", is_numeric), + ("BOOLEAN", "NULLABLE", pyarrow.types.is_boolean), + ("BOOL", "NULLABLE", pyarrow.types.is_boolean), + ("TIMESTAMP", "NULLABLE", is_timestamp), + ("DATE", "NULLABLE", pyarrow.types.is_date32), + ("TIME", "NULLABLE", pyarrow.types.is_time64), + ("DATETIME", "NULLABLE", is_datetime), + ("GEOGRAPHY", "NULLABLE", pyarrow.types.is_string), + # TODO: Use pyarrow.struct(fields) for record (struct) fields. + ("RECORD", "NULLABLE", is_none), + ("STRUCT", "NULLABLE", is_none), + # TODO: Use pyarrow.list_(item_type) for repeated (array) fields. + ("STRING", "REPEATED", is_none), + ("STRING", "repeated", is_none), + ("STRING", "RePeAtEd", is_none), + ("BYTES", "REPEATED", is_none), + ("INTEGER", "REPEATED", is_none), + ("INT64", "REPEATED", is_none), + ("FLOAT", "REPEATED", is_none), + ("FLOAT64", "REPEATED", is_none), + ("NUMERIC", "REPEATED", is_none), + ("BOOLEAN", "REPEATED", is_none), + ("BOOL", "REPEATED", is_none), + ("TIMESTAMP", "REPEATED", is_none), + ("DATE", "REPEATED", is_none), + ("TIME", "REPEATED", is_none), + ("DATETIME", "REPEATED", is_none), + ("GEOGRAPHY", "REPEATED", is_none), + ("RECORD", "REPEATED", is_none), + ], +) +def test_bq_to_arrow_data_type(module_under_test, bq_type, bq_mode, is_correct_type): + field = schema.SchemaField("ignored_name", bq_type, mode=bq_mode) + got = module_under_test.bq_to_arrow_data_type(field) + assert is_correct_type(got) From 46c3a12137cd27a8faa1f2836d34e4048524a21e Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 22 May 2019 16:35:28 -0700 Subject: [PATCH 2/6] Improve code coverage. --- .../google/cloud/bigquery/_pandas_helpers.py | 5 +- bigquery/tests/system.py | 1 - bigquery/tests/unit/test__pandas_helpers.py | 76 +++++++++++++++---- bigquery/tests/unit/test_client.py | 43 +++++++++++ 4 files changed, 107 insertions(+), 18 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 52ac42ffa682..d5c76935c7e6 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -19,7 +19,6 @@ import pyarrow.parquet except ImportError: # pragma: NO COVER pyarrow = None -import six.moves def pyarrow_datetime(): @@ -39,7 +38,7 @@ def pyarrow_timestamp(): BQ_TO_ARROW_SCALARS = {} -if pyarrow is not None: +if pyarrow is not None: # pragma: NO COVER BQ_TO_ARROW_SCALARS = { "BOOL": pyarrow.bool_, "BOOLEAN": pyarrow.bool_, @@ -95,7 +94,6 @@ def to_parquet(dataframe, bq_schema, filepath): raise ValueError("pyarrow is required for BigQuery schema conversion") if len(bq_schema) != len(dataframe.columns): - # TODO: match names, too. raise ValueError( "Number of columns in schema must match number of columns in dataframe" ) @@ -110,6 +108,5 @@ def to_parquet(dataframe, bq_schema, filepath): ) ) - # TODO: make pyarrow table and write to parquet. arrow_table = pyarrow.Table.from_arrays(arrow_arrays, names=column_names) pyarrow.parquet.write_table(arrow_table, filepath) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 77b44ff5e22a..8960fe93f4cd 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -634,7 +634,6 @@ def test_load_table_from_dataframe_w_nulls(self): See: https://github.com/googleapis/google-cloud-python/issues/7370 """ - # TODO: make table with certain schema. Try to load / append to that table with a bunch of null columns. # Schema with all scalar types. table_schema = ( bigquery.SchemaField("bool_col", "BOOLEAN"), diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index 8478d7871644..2b0e3a8b0dbb 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -12,7 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pyarrow.types +import functools + +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None +try: + import pyarrow + import pyarrow.types +except ImportError: # pragma: NO COVER + pyarrow = None import pytest from google.cloud.bigquery import schema @@ -29,13 +39,22 @@ def is_none(value): return value is None +def is_datetime(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime-type + return all_( + pyarrow.types.is_timestamp, + lambda type_: type_.unit == "us", + lambda type_: type_.tz is None, + )(type_) + + def is_numeric(type_): # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type return all_( pyarrow.types.is_decimal, lambda type_: type_.precision == 38, lambda type_: type_.scale == 9, - ) + )(type_) def is_timestamp(type_): @@ -44,23 +63,34 @@ def is_timestamp(type_): pyarrow.types.is_timestamp, lambda type_: type_.unit == "us", lambda type_: type_.tz == "UTC", - ) + )(type_) -def is_datetime(type_): - # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime-type - return all_( - pyarrow.types.is_timestamp, - lambda type_: type_.unit == "us", - lambda type_: type_.tz is None, - ) +def do_all(functions, value): + return all((func(value) for func in functions)) def all_(*functions): - def do_all(value): - return all((func(value) for func in functions)) + return functools.partial(do_all, functions) + + +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_is_datetime(): + assert is_datetime(pyarrow.timestamp("us", tz=None)) + assert not is_datetime(pyarrow.timestamp("ms", tz=None)) + assert not is_datetime(pyarrow.timestamp("us", tz="UTC")) + assert not is_datetime(pyarrow.string()) + + +def test_do_all(): + assert do_all((lambda _: True, lambda _: True), None) + assert not do_all((lambda _: True, lambda _: False), None) + assert not do_all((lambda _: False,), None) - return do_all + +def test_all_(): + assert all_(lambda _: True, lambda _: True)(None) + assert not all_(lambda _: True, lambda _: False)(None) @pytest.mark.parametrize( @@ -83,6 +113,7 @@ def do_all(value): ("TIME", "NULLABLE", pyarrow.types.is_time64), ("DATETIME", "NULLABLE", is_datetime), ("GEOGRAPHY", "NULLABLE", pyarrow.types.is_string), + ("UNKNOWN_TYPE", "NULLABLE", is_none), # TODO: Use pyarrow.struct(fields) for record (struct) fields. ("RECORD", "NULLABLE", is_none), ("STRUCT", "NULLABLE", is_none), @@ -106,7 +137,26 @@ def do_all(value): ("RECORD", "REPEATED", is_none), ], ) +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") def test_bq_to_arrow_data_type(module_under_test, bq_type, bq_mode, is_correct_type): field = schema.SchemaField("ignored_name", bq_type, mode=bq_mode) got = module_under_test.bq_to_arrow_data_type(field) assert is_correct_type(got) + + +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +def test_to_parquet_without_pyarrow(module_under_test, monkeypatch): + monkeypatch.setattr(module_under_test, "pyarrow", None) + with pytest.raises(ValueError) as exc: + module_under_test.to_parquet(pandas.DataFrame(), (), None) + assert "pyarrow is required" in str(exc) + + +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_to_parquet_w_missing_columns(module_under_test, monkeypatch): + with pytest.raises(ValueError) as exc: + module_under_test.to_parquet( + pandas.DataFrame(), (schema.SchemaField("not_found", "STRING"),), None + ) + assert "columns in schema must match" in str(exc) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 13889f90d7e8..dd98f2bcce64 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5000,6 +5000,49 @@ def test_load_table_from_dataframe_w_custom_job_config(self): assert sent_config is job_config assert sent_config.source_format == job.SourceFormat.PARQUET + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_nulls(self): + """Test that a DataFrame with null columns can be uploaded if a + BigQuery schema is specified. + + See: https://github.com/googleapis/google-cloud-python/issues/7370 + """ + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + + client = self._make_client() + records = [{"name": None, "age": None}, {"name": None, "age": None}] + dataframe = pandas.DataFrame(records) + schema = [SchemaField("name", "STRING"), SchemaField("age", "INTEGER")] + job_config = job.LoadJobConfig(schema=schema) + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + with load_patch as load_table_from_file: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config is job_config + assert sent_config.source_format == job.SourceFormat.PARQUET + # Low-level tests @classmethod From 58e59ea7e2e8ea75fb3dabf60cc428d59fbcf28d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 22 May 2019 16:43:59 -0700 Subject: [PATCH 3/6] Link to LoadJobConfig.schema in docstring. --- bigquery/google/cloud/bigquery/client.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 2517639d6d8e..78d718aa6a2a 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1272,14 +1272,15 @@ def load_table_from_dataframe( project (str, optional): Project ID of the project of where to run the job. Defaults to the client's project. - job_config (google.cloud.bigquery.job.LoadJobConfig, optional): + job_config (~google.cloud.bigquery.job.LoadJobConfig, optional): Extra configuration options for the job. To override the default pandas data type conversions, supply - a BigQuery schema in the job configuration. If a schema is - supplied, the BigQuery schema will be used to determine the - correct pandas to parquet type conversion. Indexes are not - loaded. Requires the :mod:`pyarrow` library. + a value for + :attr:`~google.cloud.bigquery.job.LoadJobConfig.schema` with + column names matching those of the dataframe. The BigQuery + schema is used to determine the correct data type conversion. + Indexes are not loaded. Requires the :mod:`pyarrow` library. Returns: google.cloud.bigquery.job.LoadJob: A new load job. From aa38e4206e115e13c0a6c16c98276821ad443cea Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 29 May 2019 17:38:10 -0700 Subject: [PATCH 4/6] Support array and struct data type conversions. --- .../google/cloud/bigquery/_pandas_helpers.py | 60 ++++-- bigquery/tests/system.py | 11 +- bigquery/tests/unit/test__pandas_helpers.py | 191 ++++++++++++++++-- 3 files changed, 223 insertions(+), 39 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index d5c76935c7e6..f1353cabb7f1 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -20,6 +20,11 @@ except ImportError: # pragma: NO COVER pyarrow = None +from google.cloud.bigquery import schema + + +STRUCT_TYPES = ("RECORD", "STRUCT") + def pyarrow_datetime(): return pyarrow.timestamp("us", tz=None) @@ -37,8 +42,7 @@ def pyarrow_timestamp(): return pyarrow.timestamp("us", tz="UTC") -BQ_TO_ARROW_SCALARS = {} -if pyarrow is not None: # pragma: NO COVER +if pyarrow: # pragma: NO COVER BQ_TO_ARROW_SCALARS = { "BOOL": pyarrow.bool_, "BOOLEAN": pyarrow.bool_, @@ -55,6 +59,8 @@ def pyarrow_timestamp(): "TIME": pyarrow_time, "TIMESTAMP": pyarrow_timestamp, } +else: + BQ_TO_ARROW_SCALARS = {} def bq_to_arrow_data_type(field): @@ -62,12 +68,17 @@ def bq_to_arrow_data_type(field): Returns None if default Arrow type inspection should be used. """ - # TODO: Use pyarrow.list_(item_type) for repeated (array) fields. if field.mode is not None and field.mode.upper() == "REPEATED": + inner_type = bq_to_arrow_data_type( + schema.SchemaField(field.name, field.field_type) + ) + if inner_type: + return pyarrow.list_(inner_type) return None - # TODO: Use pyarrow.struct(fields) for record (struct) fields. - if field.field_type.upper() in ("RECORD", "STRUCT"): - return None + + if field.field_type.upper() in STRUCT_TYPES: + arrow_fields = [bq_to_arrow_field(subfield) for subfield in field.fields] + return pyarrow.struct(arrow_fields) data_type_constructor = BQ_TO_ARROW_SCALARS.get(field.field_type.upper()) if data_type_constructor is None: @@ -75,6 +86,27 @@ def bq_to_arrow_data_type(field): return data_type_constructor() +def bq_to_arrow_field(bq_field): + """Return the Arrow field, corresponding to a given BigQuery column. + + Returns None if the Arrow type cannot be determined. + """ + arrow_type = bq_to_arrow_data_type(bq_field) + if arrow_type: + is_nullable = bq_field.mode.upper() == "NULLABLE" + return pyarrow.field(bq_field.name, arrow_type, nullable=is_nullable) + return None + + +def bq_to_arrow_array(series, bq_field): + arrow_type = bq_to_arrow_data_type(bq_field) + if bq_field.mode.upper() == "REPEATED": + return pyarrow.ListArray.from_pandas(series, type=arrow_type) + if bq_field.field_type.upper() in STRUCT_TYPES: + return pyarrow.StructArray.from_pandas(series, type=arrow_type) + return pyarrow.array(series, type=arrow_type) + + def to_parquet(dataframe, bq_schema, filepath): """Write dataframe as a Parquet file, according to the desired BQ schema. @@ -91,22 +123,18 @@ def to_parquet(dataframe, bq_schema, filepath): Path to write Parquet file to. """ if pyarrow is None: - raise ValueError("pyarrow is required for BigQuery schema conversion") + raise ValueError("pyarrow is required for BigQuery schema conversion.") if len(bq_schema) != len(dataframe.columns): raise ValueError( - "Number of columns in schema must match number of columns in dataframe" + "Number of columns in schema must match number of columns in dataframe." ) arrow_arrays = [] - column_names = [] + arrow_names = [] for bq_field in bq_schema: - column_names.append(bq_field.name) - arrow_arrays.append( - pyarrow.array( - dataframe[bq_field.name], type=bq_to_arrow_data_type(bq_field) - ) - ) + arrow_names.append(bq_field.name) + arrow_arrays.append(bq_to_arrow_array(dataframe[bq_field.name], bq_field)) - arrow_table = pyarrow.Table.from_arrays(arrow_arrays, names=column_names) + arrow_table = pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) pyarrow.parquet.write_table(arrow_table, filepath) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 8960fe93f4cd..7e7b356f4f8a 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -635,7 +635,7 @@ def test_load_table_from_dataframe_w_nulls(self): See: https://github.com/googleapis/google-cloud-python/issues/7370 """ # Schema with all scalar types. - table_schema = ( + scalars_schema = ( bigquery.SchemaField("bool_col", "BOOLEAN"), bigquery.SchemaField("bytes_col", "BYTES"), bigquery.SchemaField("date_col", "DATE"), @@ -648,6 +648,15 @@ def test_load_table_from_dataframe_w_nulls(self): bigquery.SchemaField("time_col", "TIME"), bigquery.SchemaField("ts_col", "TIMESTAMP"), ) + table_schema = scalars_schema + ( + # TODO: Array columns can't be read due to NULLABLE versus REPEATED + # mode mismatch. See: + # https://issuetracker.google.com/133415569#comment3 + # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), + # TODO: Support writing StructArrays to Parquet. See: + # https://jira.apache.org/jira/browse/ARROW-2587 + # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), + ) num_rows = 100 nulls = [None] * num_rows dataframe = pandas.DataFrame( diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index 2b0e3a8b0dbb..a1a4d87ec877 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -114,34 +114,181 @@ def test_all_(): ("DATETIME", "NULLABLE", is_datetime), ("GEOGRAPHY", "NULLABLE", pyarrow.types.is_string), ("UNKNOWN_TYPE", "NULLABLE", is_none), - # TODO: Use pyarrow.struct(fields) for record (struct) fields. - ("RECORD", "NULLABLE", is_none), - ("STRUCT", "NULLABLE", is_none), - # TODO: Use pyarrow.list_(item_type) for repeated (array) fields. - ("STRING", "REPEATED", is_none), - ("STRING", "repeated", is_none), - ("STRING", "RePeAtEd", is_none), - ("BYTES", "REPEATED", is_none), - ("INTEGER", "REPEATED", is_none), - ("INT64", "REPEATED", is_none), - ("FLOAT", "REPEATED", is_none), - ("FLOAT64", "REPEATED", is_none), - ("NUMERIC", "REPEATED", is_none), - ("BOOLEAN", "REPEATED", is_none), - ("BOOL", "REPEATED", is_none), - ("TIMESTAMP", "REPEATED", is_none), - ("DATE", "REPEATED", is_none), - ("TIME", "REPEATED", is_none), - ("DATETIME", "REPEATED", is_none), - ("GEOGRAPHY", "REPEATED", is_none), + # Use pyarrow.list_(item_type) for repeated (array) fields. + ( + "STRING", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ( + "STRING", + "repeated", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ( + "STRING", + "RePeAtEd", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ( + "BYTES", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_binary(type_.value_type), + ), + ), + ( + "INTEGER", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_int64(type_.value_type), + ), + ), + ( + "INT64", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_int64(type_.value_type), + ), + ), + ( + "FLOAT", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_float64(type_.value_type), + ), + ), + ( + "FLOAT64", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_float64(type_.value_type), + ), + ), + ( + "NUMERIC", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_numeric(type_.value_type)), + ), + ( + "BOOLEAN", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_boolean(type_.value_type), + ), + ), + ( + "BOOL", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_boolean(type_.value_type), + ), + ), + ( + "TIMESTAMP", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_timestamp(type_.value_type)), + ), + ( + "DATE", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_date32(type_.value_type), + ), + ), + ( + "TIME", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_time64(type_.value_type), + ), + ), + ( + "DATETIME", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_datetime(type_.value_type)), + ), + ( + "GEOGRAPHY", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), ("RECORD", "REPEATED", is_none), + ("UNKNOWN_TYPE", "REPEATED", is_none), ], ) @pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") def test_bq_to_arrow_data_type(module_under_test, bq_type, bq_mode, is_correct_type): field = schema.SchemaField("ignored_name", bq_type, mode=bq_mode) - got = module_under_test.bq_to_arrow_data_type(field) - assert is_correct_type(got) + actual = module_under_test.bq_to_arrow_data_type(field) + assert is_correct_type(actual) + + +@pytest.mark.parametrize( + "bq_type", [("RECORD",), ("record",), ("STRUCT",), ("struct",)] +) +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_data_type_w_struct(module_under_test, bq_type): + fields = ( + schema.SchemaField("field01", "STRING"), + schema.SchemaField("field02", "BYTES"), + schema.SchemaField("field03", "INTEGER"), + schema.SchemaField("field04", "INT64"), + schema.SchemaField("field05", "FLOAT"), + schema.SchemaField("field06", "FLOAT64"), + schema.SchemaField("field07", "NUMERIC"), + schema.SchemaField("field08", "BOOLEAN"), + schema.SchemaField("field09", "BOOL"), + schema.SchemaField("field10", "TIMESTAMP"), + schema.SchemaField("field11", "DATE"), + schema.SchemaField("field12", "TIME"), + schema.SchemaField("field13", "DATETIME"), + schema.SchemaField("field14", "GEOGRAPHY"), + ) + field = schema.SchemaField("ignored_name", "RECORD", mode="NULLABLE", fields=fields) + actual = module_under_test.bq_to_arrow_data_type(field) + expected = pyarrow.struct( + ( + pyarrow.field("field01", pyarrow.string()), + pyarrow.field("field02", pyarrow.binary()), + pyarrow.field("field03", pyarrow.int64()), + pyarrow.field("field04", pyarrow.int64()), + pyarrow.field("field05", pyarrow.float64()), + pyarrow.field("field06", pyarrow.float64()), + pyarrow.field("field07", module_under_test.pyarrow_numeric()), + pyarrow.field("field08", pyarrow.bool_()), + pyarrow.field("field09", pyarrow.bool_()), + pyarrow.field("field10", module_under_test.pyarrow_timestamp()), + pyarrow.field("field11", pyarrow.date32()), + pyarrow.field("field12", module_under_test.pyarrow_time()), + pyarrow.field("field13", module_under_test.pyarrow_datetime()), + pyarrow.field("field14", pyarrow.string()), + ) + ) + assert pyarrow.types.is_struct(actual) + assert actual.num_children == len(fields) + assert actual.equals(expected) @pytest.mark.skipIf(pandas is None, "Requires `pandas`") From 6b22c345133dcb9a8c33c4f9dbdec33ca8a4ad90 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 30 May 2019 13:51:00 -0700 Subject: [PATCH 5/6] Improve test coverage with unit tests. --- .../google/cloud/bigquery/_pandas_helpers.py | 22 ++- bigquery/tests/unit/test__pandas_helpers.py | 157 +++++++++++++++++- 2 files changed, 170 insertions(+), 9 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index f1353cabb7f1..eeb65e0b9766 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -42,7 +42,7 @@ def pyarrow_timestamp(): return pyarrow.timestamp("us", tz="UTC") -if pyarrow: # pragma: NO COVER +if pyarrow: BQ_TO_ARROW_SCALARS = { "BOOL": pyarrow.bool_, "BOOLEAN": pyarrow.bool_, @@ -59,8 +59,21 @@ def pyarrow_timestamp(): "TIME": pyarrow_time, "TIMESTAMP": pyarrow_timestamp, } -else: - BQ_TO_ARROW_SCALARS = {} +else: # pragma: NO COVER + BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER + + +def bq_to_arrow_struct_data_type(field): + arrow_fields = [] + for subfield in field.fields: + arrow_subfield = bq_to_arrow_field(subfield) + if arrow_subfield: + arrow_fields.append(arrow_subfield) + else: + # Could not determine a subfield type. Fallback to type + # inference. + return None + return pyarrow.struct(arrow_fields) def bq_to_arrow_data_type(field): @@ -77,8 +90,7 @@ def bq_to_arrow_data_type(field): return None if field.field_type.upper() in STRUCT_TYPES: - arrow_fields = [bq_to_arrow_field(subfield) for subfield in field.fields] - return pyarrow.struct(arrow_fields) + return bq_to_arrow_struct_data_type(field) data_type_constructor = BQ_TO_ARROW_SCALARS.get(field.field_type.upper()) if data_type_constructor is None: diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index a1a4d87ec877..f04f95307806 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +import decimal import functools try: @@ -245,9 +247,7 @@ def test_bq_to_arrow_data_type(module_under_test, bq_type, bq_mode, is_correct_t assert is_correct_type(actual) -@pytest.mark.parametrize( - "bq_type", [("RECORD",), ("record",), ("STRUCT",), ("struct",)] -) +@pytest.mark.parametrize("bq_type", ["RECORD", "record", "STRUCT", "struct"]) @pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") def test_bq_to_arrow_data_type_w_struct(module_under_test, bq_type): fields = ( @@ -266,7 +266,7 @@ def test_bq_to_arrow_data_type_w_struct(module_under_test, bq_type): schema.SchemaField("field13", "DATETIME"), schema.SchemaField("field14", "GEOGRAPHY"), ) - field = schema.SchemaField("ignored_name", "RECORD", mode="NULLABLE", fields=fields) + field = schema.SchemaField("ignored_name", bq_type, mode="NULLABLE", fields=fields) actual = module_under_test.bq_to_arrow_data_type(field) expected = pyarrow.struct( ( @@ -291,6 +291,155 @@ def test_bq_to_arrow_data_type_w_struct(module_under_test, bq_type): assert actual.equals(expected) +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test): + fields = ( + schema.SchemaField("field1", "STRING"), + schema.SchemaField("field2", "INTEGER"), + # Don't know what to convert UNKNOWN_TYPE to, let type inference work, + # instead. + schema.SchemaField("field3", "UNKNOWN_TYPE"), + ) + field = schema.SchemaField("ignored_name", "RECORD", mode="NULLABLE", fields=fields) + actual = module_under_test.bq_to_arrow_data_type(field) + assert actual is None + + +@pytest.mark.parametrize( + "bq_type,rows", + [ + ("STRING", ["abc", None, "def", None]), + ("BYTES", [b"abc", None, b"def", None]), + ("INTEGER", [123, None, 456, None]), + ("INT64", [-9223372036854775808, None, 9223372036854775807, 123]), + ("FLOAT", [1.25, None, 3.5, None]), + ( + "NUMERIC", + [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + decimal.Decimal("999.123456789"), + ], + ), + ("BOOLEAN", [True, None, False, None]), + ("BOOL", [False, None, True, None]), + # TODO: Once https://issues.apache.org/jira/browse/ARROW-5450 is + # resolved, test with TIMESTAMP column. Conversion from pyarrow + # TimestampArray to list of Python objects fails with OverflowError: + # Python int too large to convert to C long. + # + # ( + # "TIMESTAMP", + # [ + # datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + # None, + # datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc), + # datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + # ], + # ), + ( + "DATE", + [ + datetime.date(1, 1, 1), + None, + datetime.date(9999, 12, 31), + datetime.date(1970, 1, 1), + ], + ), + ( + "TIME", + [ + datetime.time(0, 0, 0), + None, + datetime.time(23, 59, 59, 999999), + datetime.time(12, 0, 0), + ], + ), + # TODO: Once https://issues.apache.org/jira/browse/ARROW-5450 is + # resolved, test with DATETIME column. Conversion from pyarrow + # TimestampArray to list of Python objects fails with OverflowError: + # Python int too large to convert to C long. + # + # ( + # "DATETIME", + # [ + # datetime.datetime(1, 1, 1, 0, 0, 0), + # None, + # datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + # datetime.datetime(1970, 1, 1, 0, 0, 0), + # ], + # ), + ( + "GEOGRAPHY", + [ + "POINT(30, 10)", + None, + "LINESTRING (30 10, 10 30, 40 40)", + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + ), + ], +) +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_array_w_nullable_scalars(module_under_test, bq_type, rows): + series = pandas.Series(rows, dtype="object") + bq_field = schema.SchemaField("field_name", bq_type) + arrow_array = module_under_test.bq_to_arrow_array(series, bq_field) + roundtrip = arrow_array.to_pylist() + assert rows == roundtrip + + +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_array_w_arrays(module_under_test): + rows = [[1, 2, 3], [], [4, 5, 6]] + series = pandas.Series(rows, dtype="object") + bq_field = schema.SchemaField("field_name", "INTEGER", mode="REPEATED") + arrow_array = module_under_test.bq_to_arrow_array(series, bq_field) + roundtrip = arrow_array.to_pylist() + assert rows == roundtrip + + +@pytest.mark.parametrize("bq_type", ["RECORD", "record", "STRUCT", "struct"]) +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_array_w_structs(module_under_test, bq_type): + rows = [ + {"int_col": 123, "string_col": "abc"}, + None, + {"int_col": 456, "string_col": "def"}, + ] + series = pandas.Series(rows, dtype="object") + bq_field = schema.SchemaField( + "field_name", + bq_type, + fields=( + schema.SchemaField("int_col", "INTEGER"), + schema.SchemaField("string_col", "STRING"), + ), + ) + arrow_array = module_under_test.bq_to_arrow_array(series, bq_field) + roundtrip = arrow_array.to_pylist() + assert rows == roundtrip + + +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_array_w_special_floats(module_under_test): + bq_field = schema.SchemaField("field_name", "FLOAT64") + rows = [float("-inf"), float("nan"), float("inf"), None] + series = pandas.Series(rows, dtype="object") + arrow_array = module_under_test.bq_to_arrow_array(series, bq_field) + roundtrip = arrow_array.to_pylist() + assert len(rows) == len(roundtrip) + assert roundtrip[0] == float("-inf") + assert roundtrip[1] != roundtrip[1] # NaN doesn't equal itself. + assert roundtrip[2] == float("inf") + assert roundtrip[3] is None + + @pytest.mark.skipIf(pandas is None, "Requires `pandas`") def test_to_parquet_without_pyarrow(module_under_test, monkeypatch): monkeypatch.setattr(module_under_test, "pyarrow", None) From cf402570fd0e3baf17b094bf86e496d288f057f6 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 30 May 2019 14:04:40 -0700 Subject: [PATCH 6/6] Add system test for loading dataframe with non-nulls and explicit schema. --- bigquery/tests/system.py | 85 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 7e7b356f4f8a..2b4aa84b8faf 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -27,6 +27,7 @@ import six import pytest +import pytz try: from google.cloud import bigquery_storage_v1beta1 @@ -680,6 +681,9 @@ def test_load_table_from_dataframe_w_nulls(self): table_id = "{}.{}.load_table_from_dataframe_w_nulls".format( Config.CLIENT.project, dataset_id ) + + # Create the table before loading so that schema mismatch errors are + # identified. table = retry_403(Config.CLIENT.create_table)( Table(table_id, schema=table_schema) ) @@ -695,6 +699,87 @@ def test_load_table_from_dataframe_w_nulls(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, num_rows) + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_explicit_schema(self): + # Schema with all scalar types. + scalars_schema = ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("bytes_col", "BYTES"), + bigquery.SchemaField("date_col", "DATE"), + bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float_col", "FLOAT"), + bigquery.SchemaField("geo_col", "GEOGRAPHY"), + bigquery.SchemaField("int_col", "INTEGER"), + bigquery.SchemaField("num_col", "NUMERIC"), + bigquery.SchemaField("str_col", "STRING"), + bigquery.SchemaField("time_col", "TIME"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + ) + table_schema = scalars_schema + ( + # TODO: Array columns can't be read due to NULLABLE versus REPEATED + # mode mismatch. See: + # https://issuetracker.google.com/133415569#comment3 + # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), + # TODO: Support writing StructArrays to Parquet. See: + # https://jira.apache.org/jira/browse/ARROW-2587 + # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), + ) + dataframe = pandas.DataFrame( + { + "bool_col": [True, None, False], + "bytes_col": [b"abc", None, b"def"], + "date_col": [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], + "dt_col": [ + datetime.datetime(1, 1, 1, 0, 0, 0), + None, + datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + ], + "float_col": [float("-inf"), float("nan"), float("inf")], + "geo_col": [ + "POINT(30 10)", + None, + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + "int_col": [-9223372036854775808, None, 9223372036854775807], + "num_col": [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + ], + "str_col": ["abc", None, "def"], + "time_col": [ + datetime.time(0, 0, 0), + None, + datetime.time(23, 59, 59, 999999), + ], + "ts_col": [ + datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + None, + datetime.datetime( + 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc + ), + ], + }, + dtype="object", + ) + + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema".format( + Config.CLIENT.project, dataset_id + ) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = Config.CLIENT.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table_id) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 3) + def test_load_avro_from_uri_then_dump_table(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import SourceFormat