diff --git a/bigquery_storage/noxfile.py b/bigquery_storage/noxfile.py index ec450b9aedc7..79b9d3689512 100644 --- a/bigquery_storage/noxfile.py +++ b/bigquery_storage/noxfile.py @@ -118,6 +118,7 @@ def system(session): session.install("-e", local_dep) session.install("-e", "../test_utils/") session.install("-e", ".[fastavro,pandas,pyarrow]") + session.install("-e", "../bigquery/") session.install("-e", ".") # Run py.test against the system tests. diff --git a/bigquery_storage/tests/system/assets/people_data.csv b/bigquery_storage/tests/system/assets/people_data.csv new file mode 100644 index 000000000000..819adfc4bdf5 --- /dev/null +++ b/bigquery_storage/tests/system/assets/people_data.csv @@ -0,0 +1,6 @@ +first_name,last_name,age +John,Doe,42 +Jack,Black,53 +Nick,Sleek,24 +Kevin,Powell,50 +Johnny,Young,2 diff --git a/bigquery_storage/tests/system/conftest.py b/bigquery_storage/tests/system/conftest.py index f88a38e43e9a..1db1521e5510 100644 --- a/bigquery_storage/tests/system/conftest.py +++ b/bigquery_storage/tests/system/conftest.py @@ -16,20 +16,203 @@ """System tests for reading rows from tables.""" import os +import uuid import pytest from google.cloud import bigquery_storage_v1beta1 -@pytest.fixture() +_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets") + + +@pytest.fixture(scope="session") def project_id(): return os.environ["PROJECT_ID"] -@pytest.fixture() -def client(): - return bigquery_storage_v1beta1.BigQueryStorageClient() +@pytest.fixture(scope="session") +def credentials(): + from google.oauth2 import service_account + + # NOTE: the test config in noxfile checks that the env variable is indeed set + filename = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + return service_account.Credentials.from_service_account_file(filename) + + +@pytest.fixture(scope="session") +def bq_client(credentials): + from google.cloud import bigquery + + return bigquery.Client(credentials=credentials) + + +@pytest.fixture(scope="session") +def dataset(project_id, bq_client): + from google.cloud import bigquery + + unique_suffix = str(uuid.uuid4()).replace("-", "_") + dataset_name = "bq_storage_system_tests_" + unique_suffix + + dataset_id = "{}.{}".format(project_id, dataset_name) + dataset = bigquery.Dataset(dataset_id) + dataset.location = "US" + created_dataset = bq_client.create_dataset(dataset) + + yield created_dataset + + bq_client.delete_dataset(dataset, delete_contents=True) + + +@pytest.fixture(scope="session") +def table(project_id, dataset, bq_client): + from google.cloud import bigquery + + schema = [ + bigquery.SchemaField("first_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"), + bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + + table_id = "{}.{}.{}".format(project_id, dataset.dataset_id, "users") + bq_table = bigquery.Table(table_id, schema=schema) + created_table = bq_client.create_table(bq_table) + + yield created_table + + bq_client.delete_table(created_table) + + +@pytest.fixture +def table_with_data_ref(dataset, table, bq_client): + from google.cloud import bigquery + + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.CSV + job_config.skip_leading_rows = 1 + job_config.schema = table.schema + + filename = os.path.join(_ASSETS_DIR, "people_data.csv") + + with open(filename, "rb") as source_file: + job = bq_client.load_table_from_file(source_file, table, job_config=job_config) + + job.result() # wait for the load to complete + + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = table.project + table_ref.dataset_id = table.dataset_id + table_ref.table_id = table.table_id + yield table_ref + + # truncate table data + query = "DELETE FROM {}.{} WHERE 1 = 1".format(dataset.dataset_id, table.table_id) + query_job = bq_client.query(query, location="US") + query_job.result() + + +@pytest.fixture +def col_partition_table_ref(project_id, dataset, bq_client): + from google.cloud import bigquery + + schema = [ + bigquery.SchemaField("occurred", "DATE", mode="REQUIRED"), + bigquery.SchemaField("description", "STRING", mode="REQUIRED"), + ] + time_partitioning = bigquery.table.TimePartitioning( + type_=bigquery.table.TimePartitioningType.DAY, field="occurred" + ) + bq_table = bigquery.table.Table( + table_ref="{}.{}.notable_events".format(project_id, dataset.dataset_id), + schema=schema, + ) + bq_table.time_partitioning = time_partitioning + + created_table = bq_client.create_table(bq_table) + + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = created_table.project + table_ref.dataset_id = created_table.dataset_id + table_ref.table_id = created_table.table_id + yield table_ref + + bq_client.delete_table(created_table) + + +@pytest.fixture +def ingest_partition_table_ref(project_id, dataset, bq_client): + from google.cloud import bigquery + + schema = [ + bigquery.SchemaField("shape", "STRING", mode="REQUIRED"), + bigquery.SchemaField("altitude", "INT64", mode="REQUIRED"), + ] + time_partitioning = bigquery.table.TimePartitioning( + type_=bigquery.table.TimePartitioningType.DAY, + field=None, # use _PARTITIONTIME pseudo column + ) + bq_table = bigquery.table.Table( + table_ref="{}.{}.ufo_sightings".format(project_id, dataset.dataset_id), + schema=schema, + ) + bq_table.time_partitioning = time_partitioning + + created_table = bq_client.create_table(bq_table) + + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = created_table.project + table_ref.dataset_id = created_table.dataset_id + table_ref.table_id = created_table.table_id + yield table_ref + + bq_client.delete_table(created_table) + + +@pytest.fixture +def all_types_table_ref(project_id, dataset, bq_client): + from google.cloud import bigquery + + schema = [ + bigquery.SchemaField("string_field", "STRING"), + bigquery.SchemaField("bytes_field", "BYTES"), + bigquery.SchemaField("int64_field", "INT64"), + bigquery.SchemaField("float64_field", "FLOAT64"), + bigquery.SchemaField("numeric_field", "NUMERIC"), + bigquery.SchemaField("bool_field", "BOOL"), + bigquery.SchemaField("geography_field", "GEOGRAPHY"), + bigquery.SchemaField( + "person_struct_field", + "STRUCT", + fields=( + bigquery.SchemaField("name", "STRING"), + bigquery.SchemaField("age", "INT64"), + ), + ), + bigquery.SchemaField("timestamp_field", "TIMESTAMP"), + bigquery.SchemaField("date_field", "DATE"), + bigquery.SchemaField("time_field", "TIME"), + bigquery.SchemaField("datetime_field", "DATETIME"), + bigquery.SchemaField("string_array_field", "STRING", mode="REPEATED"), + ] + bq_table = bigquery.table.Table( + table_ref="{}.{}.complex_records".format(project_id, dataset.dataset_id), + schema=schema, + ) + + created_table = bq_client.create_table(bq_table) + + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = created_table.project + table_ref.dataset_id = created_table.dataset_id + table_ref.table_id = created_table.table_id + yield table_ref + + bq_client.delete_table(created_table) + + +@pytest.fixture(scope="session") +def client(credentials): + return bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials) @pytest.fixture() diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index 13e72fafeceb..eb02eeea48cd 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -15,9 +15,57 @@ # limitations under the License. """System tests for reading rows from tables.""" +import copy +import datetime as dt +import decimal +import json +import io +import re + import pytest +import pytz +from google.cloud import bigquery from google.cloud import bigquery_storage_v1beta1 +from google.protobuf import timestamp_pb2 + + +# TODO: remove once a similar method is implemented in the library itself +# https://github.com/googleapis/google-cloud-python/issues/4553 +def _add_rows(table_ref, new_data, bq_client, partition_suffix=""): + """Insert additional rows into an existing table. + + Args: + table_ref (bigquery_storage_v1beta1.types.TableReference): + A reference to the target table. + new_data (Iterable[Dict[str, Any]]): + New data to insert with each row represented as a dictionary. + The keys must match the table column names, and the values + must be JSON serializable. + bq_client (bigquery.Client): + A BigQuery client instance to use for API calls. + partition_suffix (str): + An option suffix to append to the table_id, useful for selecting + partitions of ingestion-time partitioned tables. + """ + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON + ) + + new_data_str = u"\n".join(json.dumps(item) for item in new_data) + new_data_file = io.BytesIO(new_data_str.encode()) + + destination_ref = bigquery.table.TableReference.from_api_repr( + { + "projectId": table_ref.project_id, + "datasetId": table_ref.dataset_id, + "tableId": table_ref.table_id + partition_suffix, + } + ) + job = bq_client.load_table_from_file( + new_data_file, destination=destination_ref, job_config=job_config + ) + job.result() # wait for the load to complete @pytest.mark.parametrize( @@ -73,3 +121,314 @@ def test_read_rows_as_rows_full_table( rows = list(client.read_rows(stream_pos).rows(session)) assert len(rows) > 0 + + +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_basic_nonfiltered_read(client, project_id, table_with_data_ref, data_format): + session = client.create_read_session( + table_with_data_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + assert len(rows) == 5 # all table rows + + +def test_filtered_rows_read(client, project_id, table_with_data_ref): + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.row_restriction = "age >= 50" + + session = client.create_read_session( + table_with_data_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.AVRO, + requested_streams=1, + read_options=read_options, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + assert len(rows) == 2 + + +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_column_selection_read(client, project_id, table_with_data_ref, data_format): + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.selected_fields.append("first_name") + read_options.selected_fields.append("age") + + session = client.create_read_session( + table_with_data_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + read_options=read_options, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + for row in rows: + assert sorted(row.keys()) == ["age", "first_name"] + + +def test_snapshot(client, project_id, table_with_data_ref, bq_client): + before_new_data = timestamp_pb2.Timestamp() + before_new_data.GetCurrentTime() + + # load additional data into the table + new_data = [ + {u"first_name": u"NewGuyFoo", u"last_name": u"Smith", u"age": 46}, + {u"first_name": u"NewGuyBar", u"last_name": u"Jones", u"age": 30}, + ] + _add_rows(table_with_data_ref, new_data, bq_client) + + # read data using the timestamp before the additional data load + session = client.create_read_session( + table_with_data_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.AVRO, + requested_streams=1, + table_modifiers={"snapshot_time": before_new_data}, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + # verify that only the data before the timestamp was returned + assert len(rows) == 5 # all initial records + + for row in rows: + assert "NewGuy" not in row["first_name"] # no new records + + +def test_column_partitioned_table( + client, project_id, col_partition_table_ref, bq_client +): + data = [ + {"description": "Tracking established.", "occurred": "2017-02-15"}, + {"description": "Look, a solar eclipse!", "occurred": "2018-02-15"}, + {"description": "Fake solar eclipse reported.", "occurred": "2018-02-15"}, + {"description": "1 day after false eclipse report.", "occurred": "2018-02-16"}, + {"description": "1 year after false eclipse report.", "occurred": "2019-02-15"}, + ] + + _add_rows(col_partition_table_ref, data, bq_client) + + # Read from the table with a partition filter specified, and verify that + # only the expected data is returned. + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.row_restriction = "occurred = '2018-02-15'" + + session = client.create_read_session( + col_partition_table_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.AVRO, + requested_streams=1, + read_options=read_options, + ) + + assert session.streams # there should be some data to fetch + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + rows = list(client.read_rows(stream_pos).rows(session)) + + assert len(rows) == 2 + + expected_descriptions = ("Look, a solar eclipse!", "Fake solar eclipse reported.") + for row in rows: + assert row["occurred"] == dt.date(2018, 2, 15) + assert row["description"] in expected_descriptions + + +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_ingestion_time_partitioned_table( + client, project_id, ingest_partition_table_ref, bq_client, data_format +): + data = [{"shape": "cigar", "altitude": 1200}, {"shape": "disc", "altitude": 750}] + _add_rows(ingest_partition_table_ref, data, bq_client, partition_suffix="$20190809") + + data = [ + {"shape": "sphere", "altitude": 3500}, + {"shape": "doughnut", "altitude": 100}, + ] + _add_rows(ingest_partition_table_ref, data, bq_client, partition_suffix="$20190810") + + data = [ + {"shape": "elephant", "altitude": 1}, + {"shape": "rocket", "altitude": 12700}, + ] + _add_rows(ingest_partition_table_ref, data, bq_client, partition_suffix="$20190811") + + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.row_restriction = "DATE(_PARTITIONTIME) = '2019-08-10'" + + session = client.create_read_session( + ingest_partition_table_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + read_options=read_options, + ) + + assert session.streams # there should be some data to fetch + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + rows = list(client.read_rows(stream_pos).rows(session)) + assert len(rows) == 2 + + actual_items = {(row["shape"], row["altitude"]) for row in rows} + expected_items = {("sphere", 3500), ("doughnut", 100)} + assert actual_items == expected_items + + +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_decoding_data_types( + client, project_id, all_types_table_ref, bq_client, data_format +): + data = [ + { + u"string_field": u"Price: € 9.95.", + u"bytes_field": bigquery._helpers._bytes_to_json(b"byteees"), + u"int64_field": -1085, + u"float64_field": -42.195, + u"numeric_field": "1.4142", + u"bool_field": True, + u"geography_field": '{"type": "Point", "coordinates": [-49.3028, 69.0622]}', + u"person_struct_field": {u"name": u"John", u"age": 42}, + u"timestamp_field": 1565357902.017896, # 2019-08-09T13:38:22.017896 + u"date_field": u"1995-03-17", + u"time_field": u"16:24:51", + u"datetime_field": u"2005-10-26T19:49:41", + u"string_array_field": [u"foo", u"bar", u"baz"], + } + ] + + _add_rows(all_types_table_ref, data, bq_client) + + session = client.create_read_session( + all_types_table_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + ) + + assert session.streams # there should be data available + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + expected_result = { + u"string_field": u"Price: € 9.95.", + u"bytes_field": b"byteees", + u"int64_field": -1085, + u"float64_field": -42.195, + u"numeric_field": decimal.Decimal("1.4142"), + u"bool_field": True, + u"geography_field": "POINT(-49.3028 69.0622)", + u"person_struct_field": {u"name": u"John", u"age": 42}, + u"timestamp_field": dt.datetime(2019, 8, 9, 13, 38, 22, 17896, tzinfo=pytz.UTC), + u"date_field": dt.date(1995, 3, 17), + u"time_field": dt.time(16, 24, 51), + u"string_array_field": [u"foo", u"bar", u"baz"], + } + + result_copy = copy.copy(rows[0]) + del result_copy["datetime_field"] + assert result_copy == expected_result + + # Compare datetime separately, AVRO and PYARROW return different object types, + # although they should both represent the same value. + # TODO: when fixed, change assertion to assert a datetime instance! + expected_pattern = re.compile(r"2005-10-26( |T)19:49:41") + assert expected_pattern.match(str(rows[0]["datetime_field"])) + + +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_resuming_read_from_offset(client, project_id, data_format): + shakespeare_ref = bigquery_storage_v1beta1.types.TableReference() + shakespeare_ref.project_id = project_id + shakespeare_ref.dataset_id = "public_samples_copy" + shakespeare_ref.table_id = "shakespeare" + + read_session = client.create_read_session( + shakespeare_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + ) + + assert read_session.streams # there should be data available + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=read_session.streams[0], offset=0 + ) + read_rows_stream = client.read_rows(stream_pos) + + # fetch the first two batches of rows + rows_iter = iter(read_rows_stream) + some_rows = next(rows_iter) + more_rows = next(rows_iter) + + # fetch the rest of the rows using the stream offset + new_stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=read_session.streams[0], offset=some_rows.row_count + more_rows.row_count + ) + remaining_rows_count = sum( + 1 for _ in client.read_rows(new_stream_pos).rows(read_session) + ) + + # verify that the counts match + expected_len = 164656 # total rows in shakespeare table + actual_len = remaining_rows_count + some_rows.row_count + more_rows.row_count + assert actual_len == expected_len