From f30fd6a2cac79286ba7caf3848cb880aabd3160e Mon Sep 17 00:00:00 2001 From: yoshi-automation Date: Thu, 19 Nov 2020 06:06:16 -0800 Subject: [PATCH 1/8] changes without context autosynth cannot find the source of changes triggered by earlier changes in this repository, or by version upgrades to tools such as linters. --- noxfile.py | 2 +- synth.metadata | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/noxfile.py b/noxfile.py index 34936e27..65d0c4b8 100644 --- a/noxfile.py +++ b/noxfile.py @@ -28,7 +28,7 @@ DEFAULT_PYTHON_VERSION = "3.8" SYSTEM_TEST_PYTHON_VERSIONS = ["3.8"] -UNIT_TEST_PYTHON_VERSIONS = ["3.6", "3.7", "3.8"] +UNIT_TEST_PYTHON_VERSIONS = ["3.6", "3.7", "3.8", "3.9"] @nox.session(python=DEFAULT_PYTHON_VERSION) diff --git a/synth.metadata b/synth.metadata index 22c8acb8..6509ff7e 100644 --- a/synth.metadata +++ b/synth.metadata @@ -4,7 +4,7 @@ "git": { "name": ".", "remote": "https://github.com/googleapis/python-bigquery-storage.git", - "sha": "7cf4f32abde1436fbe6c0848647b4ff703e60f85" + "sha": "994a7c1cb1f8008e630d2325a9c168001e5081b4" } }, { From 964fbacab8133dfa89d9adee2f09abb3218ef2d7 Mon Sep 17 00:00:00 2001 From: yoshi-automation Date: Thu, 19 Nov 2020 06:06:51 -0800 Subject: [PATCH 2/8] docs(python): update intersphinx for grpc and auth * docs(python): update intersphinx for grpc and auth * use https for python intersphinx Co-authored-by: Tim Swast Source-Author: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> Source-Date: Wed Nov 18 14:37:25 2020 -0700 Source-Repo: googleapis/synthtool Source-Sha: 9a7d9fbb7045c34c9d3d22c1ff766eeae51f04c9 Source-Link: https://github.com/googleapis/synthtool/commit/9a7d9fbb7045c34c9d3d22c1ff766eeae51f04c9 --- docs/conf.py | 9 ++++++--- synth.metadata | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index e3efb9fd..ce94a6a1 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -345,10 +345,13 @@ # Example configuration for intersphinx: refer to the Python standard library. intersphinx_mapping = { - "python": ("http://python.readthedocs.org/en/latest/", None), - "google-auth": ("https://google-auth.readthedocs.io/en/stable", None), + "python": ("https://python.readthedocs.org/en/latest/", None), + "google-auth": ( + "https://googleapis.dev/python/google-auth/latest/index.html", + None, + ), "google.api_core": ("https://googleapis.dev/python/google-api-core/latest/", None,), - "grpc": ("https://grpc.io/grpc/python/", None), + "grpc": ("https://grpc.github.io/grpc/python/", None), "proto-plus": ("https://proto-plus-python.readthedocs.io/en/latest/", None), } diff --git a/synth.metadata b/synth.metadata index 6509ff7e..831ec1b1 100644 --- a/synth.metadata +++ b/synth.metadata @@ -19,14 +19,14 @@ "git": { "name": "synthtool", "remote": "https://github.com/googleapis/synthtool.git", - "sha": "1f1148d3c7a7a52f0c98077f976bd9b3c948ee2b" + "sha": "9a7d9fbb7045c34c9d3d22c1ff766eeae51f04c9" } }, { "git": { "name": "synthtool", "remote": "https://github.com/googleapis/synthtool.git", - "sha": "1f1148d3c7a7a52f0c98077f976bd9b3c948ee2b" + "sha": "9a7d9fbb7045c34c9d3d22c1ff766eeae51f04c9" } } ], From 78a97a8faa8e1c64d2cd4e2a177cacc1d12f347a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 8 Dec 2020 15:56:54 -0600 Subject: [PATCH 3/8] remove unnecessary index.html --- docs/conf.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index ce94a6a1..96d96148 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -346,10 +346,7 @@ # Example configuration for intersphinx: refer to the Python standard library. intersphinx_mapping = { "python": ("https://python.readthedocs.org/en/latest/", None), - "google-auth": ( - "https://googleapis.dev/python/google-auth/latest/index.html", - None, - ), + "google-auth": ("https://googleapis.dev/python/google-auth/latest/", None), "google.api_core": ("https://googleapis.dev/python/google-api-core/latest/", None,), "grpc": ("https://grpc.github.io/grpc/python/", None), "proto-plus": ("https://proto-plus-python.readthedocs.io/en/latest/", None), From aac492126fb3ae974b386e13ac8124f77b81703f Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 7 Jan 2021 10:07:00 -0600 Subject: [PATCH 4/8] add temporary hack to remove pyarrow from 3.9 builds --- noxfile.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/noxfile.py b/noxfile.py index 65d0c4b8..e2fa5e0b 100644 --- a/noxfile.py +++ b/noxfile.py @@ -75,7 +75,10 @@ def default(session): session.install( "mock", "pytest", "pytest-cov", ) - session.install("-e", ".[fastavro,pandas,pyarrow]") + extras = "fastavro,pandas,pyarrow" + if session.py == "3.9": + extras = "fastavro,pandas" + session.install("-e", f".[{extras}]") # Run py.test against the unit tests. session.run( @@ -129,7 +132,10 @@ def system(session): session.install( "mock", "pytest", "google-cloud-testutils", ) - session.install("-e", ".[fastavro,pandas,pyarrow]") + extras = "fastavro,pandas,pyarrow" + if session.py == "3.9": + extras = "fastavro,pandas" + session.install("-e", f".[{extras}]") # Run py.test against the system tests. if system_test_exists: From 0044ee1c9a4fdb316828eb0b6c279f662c4b8b68 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 7 Jan 2021 10:14:55 -0600 Subject: [PATCH 5/8] typo --- noxfile.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/noxfile.py b/noxfile.py index e2fa5e0b..c6d9258f 100644 --- a/noxfile.py +++ b/noxfile.py @@ -76,7 +76,7 @@ def default(session): "mock", "pytest", "pytest-cov", ) extras = "fastavro,pandas,pyarrow" - if session.py == "3.9": + if session.python == "3.9": extras = "fastavro,pandas" session.install("-e", f".[{extras}]") @@ -133,7 +133,7 @@ def system(session): "mock", "pytest", "google-cloud-testutils", ) extras = "fastavro,pandas,pyarrow" - if session.py == "3.9": + if session.python == "3.9": extras = "fastavro,pandas" session.install("-e", f".[{extras}]") From 02beef48f1a8ed99f7a27e5a71eb81028976519f Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 7 Jan 2021 10:46:13 -0600 Subject: [PATCH 6/8] test: separate pyarrow tests from other tests --- tests/unit/__init__.py | 0 tests/unit/helpers.py | 73 ++++++ tests/unit/test_reader_v1.py | 351 +---------------------------- tests/unit/test_reader_v1_arrow.py | 324 ++++++++++++++++++++++++++ 4 files changed, 398 insertions(+), 350 deletions(-) create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/helpers.py create mode 100644 tests/unit/test_reader_v1_arrow.py diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/helpers.py b/tests/unit/helpers.py new file mode 100644 index 00000000..712850ff --- /dev/null +++ b/tests/unit/helpers.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import decimal + +import pytz + + +SCALAR_COLUMNS = [ + {"name": "int_col", "type": "int64"}, + {"name": "float_col", "type": "float64"}, + {"name": "num_col", "type": "numeric"}, + {"name": "bool_col", "type": "bool"}, + {"name": "str_col", "type": "string"}, + {"name": "bytes_col", "type": "bytes"}, + {"name": "date_col", "type": "date"}, + {"name": "time_col", "type": "time"}, + {"name": "ts_col", "type": "timestamp"}, +] +SCALAR_COLUMN_NAMES = [field["name"] for field in SCALAR_COLUMNS] +SCALAR_BLOCKS = [ + [ + { + "int_col": 123, + "float_col": 3.14, + "num_col": decimal.Decimal("9.99"), + "bool_col": True, + "str_col": "hello world", + "bytes_col": b"ascii bytes", + "date_col": datetime.date(1998, 9, 4), + "time_col": datetime.time(12, 0), + "ts_col": datetime.datetime(2000, 1, 1, 5, 0, tzinfo=pytz.utc), + }, + { + "int_col": 456, + "float_col": 2.72, + "num_col": decimal.Decimal("0.99"), + "bool_col": False, + "str_col": "hallo welt", + "bytes_col": b"\xbb\xee\xff", + "date_col": datetime.date(1995, 3, 2), + "time_col": datetime.time(13, 37), + "ts_col": datetime.datetime(1965, 4, 3, 2, 1, tzinfo=pytz.utc), + }, + ], + [ + { + "int_col": 789, + "float_col": 1.23, + "num_col": decimal.Decimal("5.67"), + "bool_col": True, + "str_col": u"こんにちは世界", + "bytes_col": b"\x54\x69\x6d", + "date_col": datetime.date(1970, 1, 1), + "time_col": datetime.time(16, 20), + "ts_col": datetime.datetime(1991, 8, 25, 20, 57, 8, tzinfo=pytz.utc), + } + ], +] diff --git a/tests/unit/test_reader_v1.py b/tests/unit/test_reader_v1.py index 4922ab47..fe815ccb 100644 --- a/tests/unit/test_reader_v1.py +++ b/tests/unit/test_reader_v1.py @@ -14,22 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime -import decimal import itertools import json import fastavro -import pyarrow import mock import pandas import pandas.testing import pytest -import pytz import six import google.api_core.exceptions from google.cloud.bigquery_storage import types +from .helpers import SCALAR_COLUMNS, SCALAR_COLUMN_NAMES, SCALAR_BLOCKS PROJECT = "my-project" @@ -45,71 +42,6 @@ "time": {"type": "long", "logicalType": "time-micros"}, "timestamp": {"type": "long", "logicalType": "timestamp-micros"}, } -# This dictionary is duplicated in bigquery/google/cloud/bigquery/_pandas_helpers.py -# When modifying it be sure to update it there as well. -BQ_TO_ARROW_TYPES = { - "int64": pyarrow.int64(), - "float64": pyarrow.float64(), - "bool": pyarrow.bool_(), - "numeric": pyarrow.decimal128(38, 9), - "string": pyarrow.utf8(), - "bytes": pyarrow.binary(), - "date": pyarrow.date32(), # int32 days since epoch - "datetime": pyarrow.timestamp("us"), - "time": pyarrow.time64("us"), - "timestamp": pyarrow.timestamp("us", tz="UTC"), -} -SCALAR_COLUMNS = [ - {"name": "int_col", "type": "int64"}, - {"name": "float_col", "type": "float64"}, - {"name": "num_col", "type": "numeric"}, - {"name": "bool_col", "type": "bool"}, - {"name": "str_col", "type": "string"}, - {"name": "bytes_col", "type": "bytes"}, - {"name": "date_col", "type": "date"}, - {"name": "time_col", "type": "time"}, - {"name": "ts_col", "type": "timestamp"}, -] -SCALAR_COLUMN_NAMES = [field["name"] for field in SCALAR_COLUMNS] -SCALAR_BLOCKS = [ - [ - { - "int_col": 123, - "float_col": 3.14, - "num_col": decimal.Decimal("9.99"), - "bool_col": True, - "str_col": "hello world", - "bytes_col": b"ascii bytes", - "date_col": datetime.date(1998, 9, 4), - "time_col": datetime.time(12, 0), - "ts_col": datetime.datetime(2000, 1, 1, 5, 0, tzinfo=pytz.utc), - }, - { - "int_col": 456, - "float_col": 2.72, - "num_col": decimal.Decimal("0.99"), - "bool_col": False, - "str_col": "hallo welt", - "bytes_col": b"\xbb\xee\xff", - "date_col": datetime.date(1995, 3, 2), - "time_col": datetime.time(13, 37), - "ts_col": datetime.datetime(1965, 4, 3, 2, 1, tzinfo=pytz.utc), - }, - ], - [ - { - "int_col": 789, - "float_col": 1.23, - "num_col": decimal.Decimal("5.67"), - "bool_col": True, - "str_col": u"こんにちは世界", - "bytes_col": b"\x54\x69\x6d", - "date_col": datetime.date(1970, 1, 1), - "time_col": datetime.time(16, 20), - "ts_col": datetime.datetime(1991, 8, 25, 20, 57, 8, tzinfo=pytz.utc), - } - ], -] @pytest.fixture() @@ -145,35 +77,6 @@ def _bq_to_avro_blocks(bq_blocks, avro_schema_json): return avro_blocks -def _bq_to_arrow_batch_objects(bq_blocks, arrow_schema): - arrow_batches = [] - for block in bq_blocks: - arrays = [] - for name in arrow_schema.names: - arrays.append( - pyarrow.array( - (row[name] for row in block), - type=arrow_schema.field(name).type, - size=len(block), - ) - ) - arrow_batches.append( - pyarrow.RecordBatch.from_arrays(arrays, schema=arrow_schema) - ) - return arrow_batches - - -def _bq_to_arrow_batches(bq_blocks, arrow_schema): - arrow_batches = [] - for record_batch in _bq_to_arrow_batch_objects(bq_blocks, arrow_schema): - response = types.ReadRowsResponse() - response.arrow_record_batch.serialized_record_batch = ( - record_batch.serialize().to_pybytes() - ) - arrow_batches.append(response) - return arrow_batches - - def _pages_w_nonresumable_internal_error(avro_blocks): for block in avro_blocks: yield block @@ -207,12 +110,6 @@ def _generate_avro_read_session(avro_schema_json): return types.ReadSession(avro_schema={"schema": schema}) -def _generate_arrow_read_session(arrow_schema): - return types.ReadSession( - arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()} - ) - - def _bq_to_avro_schema(bq_columns): fields = [] avro_schema = {"type": "record", "name": "__root__", "fields": fields} @@ -231,20 +128,6 @@ def _bq_to_avro_schema(bq_columns): return avro_schema -def _bq_to_arrow_schema(bq_columns): - def bq_col_as_field(column): - metadata = None - if column.get("description") is not None: - metadata = {"description": column.get("description")} - name = column["name"] - type_ = BQ_TO_ARROW_TYPES[column["type"]] - mode = column.get("mode", "nullable").lower() - - return pyarrow.field(name, type_, mode == "nullable", metadata) - - return pyarrow.schema(bq_col_as_field(c) for c in bq_columns) - - def _get_avro_bytes(rows, avro_schema): avro_file = six.BytesIO() for row in rows: @@ -266,20 +149,6 @@ def test_avro_rows_raises_import_error( reader.rows(read_session) -def test_pyarrow_rows_raises_import_error( - mut, class_under_test, mock_gapic_client, monkeypatch -): - monkeypatch.setattr(mut, "pyarrow", None) - reader = class_under_test([], mock_gapic_client, "", 0, {}) - - bq_columns = [{"name": "int_col", "type": "int64"}] - arrow_schema = _bq_to_arrow_schema(bq_columns) - read_session = _generate_arrow_read_session(arrow_schema) - - with pytest.raises(ImportError): - reader.rows(read_session) - - def test_rows_no_schema_set_raises_type_error( mut, class_under_test, mock_gapic_client, monkeypatch ): @@ -300,16 +169,6 @@ def test_rows_w_empty_stream(class_under_test, mock_gapic_client): assert tuple(got) == () -def test_rows_w_empty_stream_arrow(class_under_test, mock_gapic_client): - bq_columns = [{"name": "int_col", "type": "int64"}] - arrow_schema = _bq_to_arrow_schema(bq_columns) - read_session = _generate_arrow_read_session(arrow_schema) - reader = class_under_test([], mock_gapic_client, "", 0, {}) - - got = reader.rows(read_session) - assert tuple(got) == () - - def test_rows_w_scalars(class_under_test, mock_gapic_client): avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) read_session = _generate_avro_read_session(avro_schema) @@ -322,21 +181,6 @@ def test_rows_w_scalars(class_under_test, mock_gapic_client): assert got == expected -def test_rows_w_scalars_arrow(class_under_test, mock_gapic_client): - arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) - read_session = _generate_arrow_read_session(arrow_schema) - arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) - - reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) - got = tuple( - dict((key, value.as_py()) for key, value in row_dict.items()) - for row_dict in reader.rows(read_session) - ) - - expected = tuple(itertools.chain.from_iterable(SCALAR_BLOCKS)) - assert got == expected - - def test_rows_w_timeout(class_under_test, mock_gapic_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) @@ -481,37 +325,6 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_gapic_client): assert page_4.remaining == 0 -def test_to_arrow_no_pyarrow_raises_import_error( - mut, class_under_test, mock_gapic_client, monkeypatch -): - monkeypatch.setattr(mut, "pyarrow", None) - arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) - read_session = _generate_arrow_read_session(arrow_schema) - arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) - reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) - - with pytest.raises(ImportError): - reader.to_arrow(read_session) - - with pytest.raises(ImportError): - reader.rows(read_session).to_arrow() - - with pytest.raises(ImportError): - next(reader.rows(read_session).pages).to_arrow() - - -def test_to_arrow_w_scalars_arrow(class_under_test): - arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) - read_session = _generate_arrow_read_session(arrow_schema) - arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) - reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) - actual_table = reader.to_arrow(read_session) - expected_table = pyarrow.Table.from_batches( - _bq_to_arrow_batch_objects(SCALAR_BLOCKS, arrow_schema) - ) - assert actual_table == expected_table - - def test_to_dataframe_no_pandas_raises_import_error( mut, class_under_test, mock_gapic_client, monkeypatch ): @@ -572,24 +385,6 @@ def test_to_dataframe_w_scalars(class_under_test): ) -def test_to_dataframe_w_scalars_arrow(class_under_test): - arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) - read_session = _generate_arrow_read_session(arrow_schema) - arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) - - reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) - got = reader.to_dataframe(read_session) - - expected = pandas.DataFrame( - list(itertools.chain.from_iterable(SCALAR_BLOCKS)), columns=SCALAR_COLUMN_NAMES - ) - - pandas.testing.assert_frame_equal( - got.reset_index(drop=True), # reset_index to ignore row labels - expected.reset_index(drop=True), - ) - - def test_to_dataframe_w_dtypes(class_under_test): avro_schema = _bq_to_avro_schema( [ @@ -620,36 +415,6 @@ def test_to_dataframe_w_dtypes(class_under_test): ) -def test_to_dataframe_w_dtypes_arrow(class_under_test): - arrow_schema = _bq_to_arrow_schema( - [ - {"name": "bigfloat", "type": "float64"}, - {"name": "lilfloat", "type": "float64"}, - ] - ) - read_session = _generate_arrow_read_session(arrow_schema) - blocks = [ - [{"bigfloat": 1.25, "lilfloat": 30.5}, {"bigfloat": 2.5, "lilfloat": 21.125}], - [{"bigfloat": 3.75, "lilfloat": 11.0}], - ] - arrow_batches = _bq_to_arrow_batches(blocks, arrow_schema) - - reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) - got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"}) - - expected = pandas.DataFrame( - { - "bigfloat": [1.25, 2.5, 3.75], - "lilfloat": pandas.Series([30.5, 21.125, 11.0], dtype="float16"), - }, - columns=["bigfloat", "lilfloat"], - ) - pandas.testing.assert_frame_equal( - got.reset_index(drop=True), # reset_index to ignore row labels - expected.reset_index(drop=True), - ) - - def test_to_dataframe_empty_w_scalars_avro(class_under_test): avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) read_session = _generate_avro_read_session(avro_schema) @@ -670,26 +435,6 @@ def test_to_dataframe_empty_w_scalars_avro(class_under_test): ) -def test_to_dataframe_empty_w_scalars_arrow(class_under_test): - arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) - read_session = _generate_arrow_read_session(arrow_schema) - arrow_batches = _bq_to_arrow_batches([], arrow_schema) - reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) - - got = reader.to_dataframe(read_session) - - expected = pandas.DataFrame([], columns=SCALAR_COLUMN_NAMES) - expected["int_col"] = expected["int_col"].astype("int64") - expected["float_col"] = expected["float_col"].astype("float64") - expected["bool_col"] = expected["bool_col"].astype("bool") - expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]") - - pandas.testing.assert_frame_equal( - got.reset_index(drop=True), # reset_index to ignore row labels - expected.reset_index(drop=True), - ) - - def test_to_dataframe_empty_w_dtypes_avro(class_under_test, mock_gapic_client): avro_schema = _bq_to_avro_schema( [ @@ -713,29 +458,6 @@ def test_to_dataframe_empty_w_dtypes_avro(class_under_test, mock_gapic_client): ) -def test_to_dataframe_empty_w_dtypes_arrow(class_under_test, mock_gapic_client): - arrow_schema = _bq_to_arrow_schema( - [ - {"name": "bigfloat", "type": "float64"}, - {"name": "lilfloat", "type": "float64"}, - ] - ) - read_session = _generate_arrow_read_session(arrow_schema) - arrow_batches = _bq_to_arrow_batches([], arrow_schema) - reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) - - got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"}) - - expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"]) - expected["bigfloat"] = expected["bigfloat"].astype("float64") - expected["lilfloat"] = expected["lilfloat"].astype("float16") - - pandas.testing.assert_frame_equal( - got.reset_index(drop=True), # reset_index to ignore row labels - expected.reset_index(drop=True), - ) - - def test_to_dataframe_by_page(class_under_test, mock_gapic_client): bq_columns = [ {"name": "int_col", "type": "int64"}, @@ -797,74 +519,3 @@ def test_to_dataframe_by_page(class_under_test, mock_gapic_client): drop=True ), ) - - -def test_to_dataframe_by_page_arrow(class_under_test, mock_gapic_client): - bq_columns = [ - {"name": "int_col", "type": "int64"}, - {"name": "bool_col", "type": "bool"}, - ] - arrow_schema = _bq_to_arrow_schema(bq_columns) - read_session = _generate_arrow_read_session(arrow_schema) - - bq_block_1 = [ - {"int_col": 123, "bool_col": True}, - {"int_col": 234, "bool_col": False}, - ] - bq_block_2 = [ - {"int_col": 345, "bool_col": True}, - {"int_col": 456, "bool_col": False}, - ] - bq_block_3 = [ - {"int_col": 567, "bool_col": True}, - {"int_col": 789, "bool_col": False}, - ] - bq_block_4 = [{"int_col": 890, "bool_col": True}] - # Break blocks into two groups to test that iteration continues across - # reconnection. - bq_blocks_1 = [bq_block_1, bq_block_2] - bq_blocks_2 = [bq_block_3, bq_block_4] - batch_1 = _bq_to_arrow_batches(bq_blocks_1, arrow_schema) - batch_2 = _bq_to_arrow_batches(bq_blocks_2, arrow_schema) - - mock_gapic_client.read_rows.return_value = batch_2 - - reader = class_under_test( - _pages_w_unavailable(batch_1), mock_gapic_client, "", 0, {} - ) - got = reader.rows(read_session) - pages = iter(got.pages) - - page_1 = next(pages) - pandas.testing.assert_frame_equal( - page_1.to_dataframe( - dtypes={"int_col": "int64", "bool_col": "bool"} - ).reset_index(drop=True), - pandas.DataFrame(bq_block_1, columns=["int_col", "bool_col"]).reset_index( - drop=True - ), - ) - - page_2 = next(pages) - pandas.testing.assert_frame_equal( - page_2.to_dataframe().reset_index(drop=True), - pandas.DataFrame(bq_block_2, columns=["int_col", "bool_col"]).reset_index( - drop=True - ), - ) - - page_3 = next(pages) - pandas.testing.assert_frame_equal( - page_3.to_dataframe().reset_index(drop=True), - pandas.DataFrame(bq_block_3, columns=["int_col", "bool_col"]).reset_index( - drop=True - ), - ) - - page_4 = next(pages) - pandas.testing.assert_frame_equal( - page_4.to_dataframe().reset_index(drop=True), - pandas.DataFrame(bq_block_4, columns=["int_col", "bool_col"]).reset_index( - drop=True - ), - ) diff --git a/tests/unit/test_reader_v1_arrow.py b/tests/unit/test_reader_v1_arrow.py new file mode 100644 index 00000000..fb02e393 --- /dev/null +++ b/tests/unit/test_reader_v1_arrow.py @@ -0,0 +1,324 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools + +import pandas +import pandas.testing +import pytest + +from google.cloud.bigquery_storage import types +from .helpers import SCALAR_COLUMNS, SCALAR_COLUMN_NAMES, SCALAR_BLOCKS + + +pyarrow = pytest.importorskip("pyarrow") + + +# This dictionary is duplicated in bigquery/google/cloud/bigquery/_pandas_helpers.py +# When modifying it be sure to update it there as well. +BQ_TO_ARROW_TYPES = { + "int64": pyarrow.int64(), + "float64": pyarrow.float64(), + "bool": pyarrow.bool_(), + "numeric": pyarrow.decimal128(38, 9), + "string": pyarrow.utf8(), + "bytes": pyarrow.binary(), + "date": pyarrow.date32(), # int32 days since epoch + "datetime": pyarrow.timestamp("us"), + "time": pyarrow.time64("us"), + "timestamp": pyarrow.timestamp("us", tz="UTC"), +} + + +def _bq_to_arrow_batch_objects(bq_blocks, arrow_schema): + arrow_batches = [] + for block in bq_blocks: + arrays = [] + for name in arrow_schema.names: + arrays.append( + pyarrow.array( + (row[name] for row in block), + type=arrow_schema.field(name).type, + size=len(block), + ) + ) + arrow_batches.append( + pyarrow.RecordBatch.from_arrays(arrays, schema=arrow_schema) + ) + return arrow_batches + + +def _bq_to_arrow_batches(bq_blocks, arrow_schema): + arrow_batches = [] + for record_batch in _bq_to_arrow_batch_objects(bq_blocks, arrow_schema): + response = types.ReadRowsResponse() + response.arrow_record_batch.serialized_record_batch = ( + record_batch.serialize().to_pybytes() + ) + arrow_batches.append(response) + return arrow_batches + + +def _bq_to_arrow_schema(bq_columns): + def bq_col_as_field(column): + metadata = None + if column.get("description") is not None: + metadata = {"description": column.get("description")} + name = column["name"] + type_ = BQ_TO_ARROW_TYPES[column["type"]] + mode = column.get("mode", "nullable").lower() + + return pyarrow.field(name, type_, mode == "nullable", metadata) + + return pyarrow.schema(bq_col_as_field(c) for c in bq_columns) + + +def _generate_arrow_read_session(arrow_schema): + return types.ReadSession( + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()} + ) + + +def test_pyarrow_rows_raises_import_error( + mut, class_under_test, mock_gapic_client, monkeypatch +): + monkeypatch.setattr(mut, "pyarrow", None) + reader = class_under_test([], mock_gapic_client, "", 0, {}) + + bq_columns = [{"name": "int_col", "type": "int64"}] + arrow_schema = _bq_to_arrow_schema(bq_columns) + read_session = _generate_arrow_read_session(arrow_schema) + + with pytest.raises(ImportError): + reader.rows(read_session) + + +def test_to_arrow_no_pyarrow_raises_import_error( + mut, class_under_test, mock_gapic_client, monkeypatch +): + monkeypatch.setattr(mut, "pyarrow", None) + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) + + with pytest.raises(ImportError): + reader.to_arrow(read_session) + + with pytest.raises(ImportError): + reader.rows(read_session).to_arrow() + + with pytest.raises(ImportError): + next(reader.rows(read_session).pages).to_arrow() + + +def test_to_arrow_w_scalars_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) + actual_table = reader.to_arrow(read_session) + expected_table = pyarrow.Table.from_batches( + _bq_to_arrow_batch_objects(SCALAR_BLOCKS, arrow_schema) + ) + assert actual_table == expected_table + + +def test_to_dataframe_w_scalars_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + + reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) + got = reader.to_dataframe(read_session) + + expected = pandas.DataFrame( + list(itertools.chain.from_iterable(SCALAR_BLOCKS)), columns=SCALAR_COLUMN_NAMES + ) + + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + +def test_rows_w_empty_stream_arrow(class_under_test, mock_gapic_client): + bq_columns = [{"name": "int_col", "type": "int64"}] + arrow_schema = _bq_to_arrow_schema(bq_columns) + read_session = _generate_arrow_read_session(arrow_schema) + reader = class_under_test([], mock_gapic_client, "", 0, {}) + + got = reader.rows(read_session) + assert tuple(got) == () + + +def test_rows_w_scalars_arrow(class_under_test, mock_gapic_client): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + + reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) + got = tuple( + dict((key, value.as_py()) for key, value in row_dict.items()) + for row_dict in reader.rows(read_session) + ) + + expected = tuple(itertools.chain.from_iterable(SCALAR_BLOCKS)) + assert got == expected + + +def test_to_dataframe_w_dtypes_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema( + [ + {"name": "bigfloat", "type": "float64"}, + {"name": "lilfloat", "type": "float64"}, + ] + ) + read_session = _generate_arrow_read_session(arrow_schema) + blocks = [ + [{"bigfloat": 1.25, "lilfloat": 30.5}, {"bigfloat": 2.5, "lilfloat": 21.125}], + [{"bigfloat": 3.75, "lilfloat": 11.0}], + ] + arrow_batches = _bq_to_arrow_batches(blocks, arrow_schema) + + reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) + got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"}) + + expected = pandas.DataFrame( + { + "bigfloat": [1.25, 2.5, 3.75], + "lilfloat": pandas.Series([30.5, 21.125, 11.0], dtype="float16"), + }, + columns=["bigfloat", "lilfloat"], + ) + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + +def test_to_dataframe_empty_w_scalars_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches([], arrow_schema) + reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) + + got = reader.to_dataframe(read_session) + + expected = pandas.DataFrame([], columns=SCALAR_COLUMN_NAMES) + expected["int_col"] = expected["int_col"].astype("int64") + expected["float_col"] = expected["float_col"].astype("float64") + expected["bool_col"] = expected["bool_col"].astype("bool") + expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]") + + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + +def test_to_dataframe_empty_w_dtypes_arrow(class_under_test, mock_gapic_client): + arrow_schema = _bq_to_arrow_schema( + [ + {"name": "bigfloat", "type": "float64"}, + {"name": "lilfloat", "type": "float64"}, + ] + ) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches([], arrow_schema) + reader = class_under_test(arrow_batches, mock_gapic_client, "", 0, {}) + + got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"}) + + expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"]) + expected["bigfloat"] = expected["bigfloat"].astype("float64") + expected["lilfloat"] = expected["lilfloat"].astype("float16") + + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + +def test_to_dataframe_by_page_arrow(class_under_test, mock_gapic_client): + bq_columns = [ + {"name": "int_col", "type": "int64"}, + {"name": "bool_col", "type": "bool"}, + ] + arrow_schema = _bq_to_arrow_schema(bq_columns) + read_session = _generate_arrow_read_session(arrow_schema) + + bq_block_1 = [ + {"int_col": 123, "bool_col": True}, + {"int_col": 234, "bool_col": False}, + ] + bq_block_2 = [ + {"int_col": 345, "bool_col": True}, + {"int_col": 456, "bool_col": False}, + ] + bq_block_3 = [ + {"int_col": 567, "bool_col": True}, + {"int_col": 789, "bool_col": False}, + ] + bq_block_4 = [{"int_col": 890, "bool_col": True}] + # Break blocks into two groups to test that iteration continues across + # reconnection. + bq_blocks_1 = [bq_block_1, bq_block_2] + bq_blocks_2 = [bq_block_3, bq_block_4] + batch_1 = _bq_to_arrow_batches(bq_blocks_1, arrow_schema) + batch_2 = _bq_to_arrow_batches(bq_blocks_2, arrow_schema) + + mock_gapic_client.read_rows.return_value = batch_2 + + reader = class_under_test( + _pages_w_unavailable(batch_1), mock_gapic_client, "", 0, {} + ) + got = reader.rows(read_session) + pages = iter(got.pages) + + page_1 = next(pages) + pandas.testing.assert_frame_equal( + page_1.to_dataframe( + dtypes={"int_col": "int64", "bool_col": "bool"} + ).reset_index(drop=True), + pandas.DataFrame(bq_block_1, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + page_2 = next(pages) + pandas.testing.assert_frame_equal( + page_2.to_dataframe().reset_index(drop=True), + pandas.DataFrame(bq_block_2, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + page_3 = next(pages) + pandas.testing.assert_frame_equal( + page_3.to_dataframe().reset_index(drop=True), + pandas.DataFrame(bq_block_3, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + page_4 = next(pages) + pandas.testing.assert_frame_equal( + page_4.to_dataframe().reset_index(drop=True), + pandas.DataFrame(bq_block_4, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) From 3f532dff9c4e5db1b652290011c4f23c7ac533b6 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 7 Jan 2021 10:52:13 -0600 Subject: [PATCH 7/8] test: lint errors --- tests/unit/test_reader_v1_arrow.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/unit/test_reader_v1_arrow.py b/tests/unit/test_reader_v1_arrow.py index fb02e393..5f7e0e84 100644 --- a/tests/unit/test_reader_v1_arrow.py +++ b/tests/unit/test_reader_v1_arrow.py @@ -15,11 +15,13 @@ # limitations under the License. import itertools +from unittest import mock import pandas import pandas.testing import pytest +import google.api_core.exceptions from google.cloud.bigquery_storage import types from .helpers import SCALAR_COLUMNS, SCALAR_COLUMN_NAMES, SCALAR_BLOCKS @@ -43,6 +45,13 @@ } +@pytest.fixture() +def mock_gapic_client(): + from google.cloud.bigquery_storage_v1.services import big_query_read + + return mock.create_autospec(big_query_read.BigQueryReadClient) + + def _bq_to_arrow_batch_objects(bq_blocks, arrow_schema): arrow_batches = [] for block in bq_blocks: @@ -92,6 +101,12 @@ def _generate_arrow_read_session(arrow_schema): ) +def _pages_w_unavailable(pages): + for page in pages: + yield page + raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect") + + def test_pyarrow_rows_raises_import_error( mut, class_under_test, mock_gapic_client, monkeypatch ): From 80ec75f24ba0b8c8cfc8109a8265c88d00368394 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 7 Jan 2021 10:55:09 -0600 Subject: [PATCH 8/8] test: add missing fixtures --- tests/unit/test_reader_v1_arrow.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/unit/test_reader_v1_arrow.py b/tests/unit/test_reader_v1_arrow.py index 5f7e0e84..202e0d81 100644 --- a/tests/unit/test_reader_v1_arrow.py +++ b/tests/unit/test_reader_v1_arrow.py @@ -45,6 +45,18 @@ } +@pytest.fixture() +def mut(): + from google.cloud.bigquery_storage_v1 import reader + + return reader + + +@pytest.fixture() +def class_under_test(mut): + return mut.ReadRowsStream + + @pytest.fixture() def mock_gapic_client(): from google.cloud.bigquery_storage_v1.services import big_query_read