diff --git a/bigquery_storage/tests/system/conftest.py b/bigquery_storage/tests/system/conftest.py index 1db1521e5510..5ca85c1b42c8 100644 --- a/bigquery_storage/tests/system/conftest.py +++ b/bigquery_storage/tests/system/conftest.py @@ -69,9 +69,9 @@ def table(project_id, dataset, bq_client): from google.cloud import bigquery schema = [ - bigquery.SchemaField("first_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"), bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"), - bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"), ] table_id = "{}.{}.{}".format(project_id, dataset.dataset_id, "users") @@ -116,8 +116,8 @@ def col_partition_table_ref(project_id, dataset, bq_client): from google.cloud import bigquery schema = [ - bigquery.SchemaField("occurred", "DATE", mode="REQUIRED"), - bigquery.SchemaField("description", "STRING", mode="REQUIRED"), + bigquery.SchemaField("occurred", "DATE", mode="NULLABLE"), + bigquery.SchemaField("description", "STRING", mode="NULLABLE"), ] time_partitioning = bigquery.table.TimePartitioning( type_=bigquery.table.TimePartitioningType.DAY, field="occurred" @@ -144,8 +144,8 @@ def ingest_partition_table_ref(project_id, dataset, bq_client): from google.cloud import bigquery schema = [ - bigquery.SchemaField("shape", "STRING", mode="REQUIRED"), - bigquery.SchemaField("altitude", "INT64", mode="REQUIRED"), + bigquery.SchemaField("shape", "STRING", mode="NULLABLE"), + bigquery.SchemaField("altitude", "INT64", mode="NULLABLE"), ] time_partitioning = bigquery.table.TimePartitioning( type_=bigquery.table.TimePartitioningType.DAY, diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index eb02eeea48cd..2ba1f99fb0a0 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -18,8 +18,6 @@ import copy import datetime as dt import decimal -import json -import io import re import pytest @@ -30,42 +28,26 @@ from google.protobuf import timestamp_pb2 -# TODO: remove once a similar method is implemented in the library itself -# https://github.com/googleapis/google-cloud-python/issues/4553 -def _add_rows(table_ref, new_data, bq_client, partition_suffix=""): - """Insert additional rows into an existing table. +def _to_bq_table_ref(proto_table_ref, partition_suffix=""): + """Converts protobuf table reference to bigquery table reference. Args: - table_ref (bigquery_storage_v1beta1.types.TableReference): - A reference to the target table. - new_data (Iterable[Dict[str, Any]]): - New data to insert with each row represented as a dictionary. - The keys must match the table column names, and the values - must be JSON serializable. - bq_client (bigquery.Client): - A BigQuery client instance to use for API calls. + proto_table_ref (bigquery_storage_v1beta1.types.TableReference): + A protobuf reference to a table. partition_suffix (str): - An option suffix to append to the table_id, useful for selecting + An optional suffix to append to the table_id, useful for selecting partitions of ingestion-time partitioned tables. - """ - job_config = bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON - ) - - new_data_str = u"\n".join(json.dumps(item) for item in new_data) - new_data_file = io.BytesIO(new_data_str.encode()) - destination_ref = bigquery.table.TableReference.from_api_repr( + Returns: + google.cloud.bigquery.table.TableReference + """ + return bigquery.table.TableReference.from_api_repr( { - "projectId": table_ref.project_id, - "datasetId": table_ref.dataset_id, - "tableId": table_ref.table_id + partition_suffix, + "projectId": proto_table_ref.project_id, + "datasetId": proto_table_ref.dataset_id, + "tableId": proto_table_ref.table_id + partition_suffix, } ) - job = bq_client.load_table_from_file( - new_data_file, destination=destination_ref, job_config=job_config - ) - job.result() # wait for the load to complete @pytest.mark.parametrize( @@ -204,7 +186,9 @@ def test_snapshot(client, project_id, table_with_data_ref, bq_client): {u"first_name": u"NewGuyFoo", u"last_name": u"Smith", u"age": 46}, {u"first_name": u"NewGuyBar", u"last_name": u"Jones", u"age": 30}, ] - _add_rows(table_with_data_ref, new_data, bq_client) + + destination = _to_bq_table_ref(table_with_data_ref) + bq_client.load_table_from_json(new_data, destination).result() # read data using the timestamp before the additional data load session = client.create_read_session( @@ -238,7 +222,8 @@ def test_column_partitioned_table( {"description": "1 year after false eclipse report.", "occurred": "2019-02-15"}, ] - _add_rows(col_partition_table_ref, data, bq_client) + destination = _to_bq_table_ref(col_partition_table_ref) + bq_client.load_table_from_json(data, destination).result() # Read from the table with a partition filter specified, and verify that # only the expected data is returned. @@ -279,19 +264,28 @@ def test_ingestion_time_partitioned_table( client, project_id, ingest_partition_table_ref, bq_client, data_format ): data = [{"shape": "cigar", "altitude": 1200}, {"shape": "disc", "altitude": 750}] - _add_rows(ingest_partition_table_ref, data, bq_client, partition_suffix="$20190809") + destination = _to_bq_table_ref( + ingest_partition_table_ref, partition_suffix="$20190809" + ) + bq_client.load_table_from_json(data, destination).result() data = [ {"shape": "sphere", "altitude": 3500}, {"shape": "doughnut", "altitude": 100}, ] - _add_rows(ingest_partition_table_ref, data, bq_client, partition_suffix="$20190810") + destination = _to_bq_table_ref( + ingest_partition_table_ref, partition_suffix="$20190810" + ) + bq_client.load_table_from_json(data, destination).result() data = [ {"shape": "elephant", "altitude": 1}, {"shape": "rocket", "altitude": 12700}, ] - _add_rows(ingest_partition_table_ref, data, bq_client, partition_suffix="$20190811") + destination = _to_bq_table_ref( + ingest_partition_table_ref, partition_suffix="$20190811" + ) + bq_client.load_table_from_json(data, destination).result() read_options = bigquery_storage_v1beta1.types.TableReadOptions() read_options.row_restriction = "DATE(_PARTITIONTIME) = '2019-08-10'" @@ -345,7 +339,35 @@ def test_decoding_data_types( } ] - _add_rows(all_types_table_ref, data, bq_client) + # Explicit schema is needed to recognize bytes_field as BYTES, and not STRING. + # Since partial schemas are not supported in load_table_from_json(), a full + # schema needs to be specified. + schema = [ + bigquery.SchemaField("string_field", "STRING"), + bigquery.SchemaField("bytes_field", "BYTES"), + bigquery.SchemaField("int64_field", "INT64"), + bigquery.SchemaField("float64_field", "FLOAT64"), + bigquery.SchemaField("numeric_field", "NUMERIC"), + bigquery.SchemaField("bool_field", "BOOL"), + bigquery.SchemaField("geography_field", "GEOGRAPHY"), + bigquery.SchemaField( + "person_struct_field", + "STRUCT", + fields=( + bigquery.SchemaField("name", "STRING"), + bigquery.SchemaField("age", "INT64"), + ), + ), + bigquery.SchemaField("timestamp_field", "TIMESTAMP"), + bigquery.SchemaField("date_field", "DATE"), + bigquery.SchemaField("time_field", "TIME"), + bigquery.SchemaField("datetime_field", "DATETIME"), + bigquery.SchemaField("string_array_field", "STRING", mode="REPEATED"), + ] + + job_config = bigquery.LoadJobConfig(schema=schema) + destination = _to_bq_table_ref(all_types_table_ref) + bq_client.load_table_from_json(data, destination, job_config=job_config).result() session = client.create_read_session( all_types_table_ref,