diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index b9a414eb76ef..c36c3a28e30e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -44,6 +44,7 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.utils.timestamp import Timestamp # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -185,7 +186,7 @@ def test_table_schema_retrieve(self): the_table = bigquery_tools.BigQueryWrapper().get_table( project_id="apache-beam-testing", dataset_id="beam_bigquery_io_test", - table_id="dfsqltable_3c7d6fd5_16e0460dfd0") + table_id="table_schema_retrieve") table = the_table.schema utype = bigquery_schema_tools.\ generate_user_type_from_bq_schema(table) @@ -194,16 +195,32 @@ def test_table_schema_retrieve(self): p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( gcs_location="gs://bqio_schema_test", dataset="beam_bigquery_io_test", - table="dfsqltable_3c7d6fd5_16e0460dfd0", + table="table_schema_retrieve", project="apache-beam-testing", output_type='BEAM_ROW')) assert_that( result, equal_to([ - utype(id=3, name='customer1', type='test'), - utype(id=1, name='customer1', type='test'), - utype(id=2, name='customer2', type='test'), - utype(id=4, name='customer2', type='test') + utype( + id=1, + name='customer1', + type='test', + times=Timestamp(1633262400)), + utype( + id=3, + name='customer1', + type='test', + times=Timestamp(1664798400)), + utype( + id=2, + name='customer2', + type='test', + times=Timestamp(1601726400)), + utype( + id=4, + name='customer2', + type='test', + times=Timestamp(1570104000)) ])) @pytest.mark.it_postcommit @@ -211,7 +228,7 @@ def test_table_schema_retrieve_specifying_only_table(self): the_table = bigquery_tools.BigQueryWrapper().get_table( project_id="apache-beam-testing", dataset_id="beam_bigquery_io_test", - table_id="dfsqltable_3c7d6fd5_16e0460dfd0") + table_id="table_schema_retrieve") table = the_table.schema utype = bigquery_schema_tools.\ generate_user_type_from_bq_schema(table) @@ -221,15 +238,31 @@ def test_table_schema_retrieve_specifying_only_table(self): gcs_location="gs://bqio_schema_test", table="apache-beam-testing:" "beam_bigquery_io_test." - "dfsqltable_3c7d6fd5_16e0460dfd0", + "table_schema_retrieve", output_type='BEAM_ROW')) assert_that( result, equal_to([ - utype(id=3, name='customer1', type='test'), - utype(id=1, name='customer1', type='test'), - utype(id=2, name='customer2', type='test'), - utype(id=4, name='customer2', type='test') + utype( + id=1, + name='customer1', + type='test', + times=Timestamp(1633262400)), + utype( + id=3, + name='customer1', + type='test', + times=Timestamp(1664798400)), + utype( + id=2, + name='customer2', + type='test', + times=Timestamp(1601726400)), + utype( + id=4, + name='customer2', + type='test', + times=Timestamp(1570104000)) ])) @pytest.mark.it_postcommit @@ -237,7 +270,7 @@ def test_table_schema_retrieve_with_direct_read(self): the_table = bigquery_tools.BigQueryWrapper().get_table( project_id="apache-beam-testing", dataset_id="beam_bigquery_io_test", - table_id="dfsqltable_3c7d6fd5_16e0460dfd0") + table_id="table_schema_retrieve") table = the_table.schema utype = bigquery_schema_tools.\ generate_user_type_from_bq_schema(table) @@ -247,15 +280,31 @@ def test_table_schema_retrieve_with_direct_read(self): method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, table="apache-beam-testing:" "beam_bigquery_io_test." - "dfsqltable_3c7d6fd5_16e0460dfd0", + "table_schema_retrieve", output_type='BEAM_ROW')) assert_that( result, equal_to([ - utype(id=3, name='customer1', type='test'), - utype(id=1, name='customer1', type='test'), - utype(id=2, name='customer2', type='test'), - utype(id=4, name='customer2', type='test') + utype( + id=1, + name='customer1', + type='test', + times=Timestamp(1633262400)), + utype( + id=3, + name='customer1', + type='test', + times=Timestamp(1664798400)), + utype( + id=2, + name='customer2', + type='test', + times=Timestamp(1601726400)), + utype( + id=4, + name='customer2', + type='test', + times=Timestamp(1570104000)) ])) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index a36c38988cde..e78f7bd5a7f7 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -21,12 +21,14 @@ NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. """ +import datetime from typing import Optional from typing import Sequence import numpy as np import apache_beam as beam +import apache_beam.utils.timestamp from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.portability.api import schema_pb2 @@ -38,8 +40,10 @@ "STRING": str, "INTEGER": np.int64, "FLOAT64": np.float64, + "FLOAT": np.float64, "BOOLEAN": bool, "BYTES": bytes, + "TIMESTAMP": apache_beam.utils.timestamp.Timestamp #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } @@ -76,11 +80,11 @@ def generate_user_type_from_bq_schema(the_table_schema): def bq_field_to_type(field, mode): - if mode == 'NULLABLE': + if mode == 'NULLABLE' or mode is None or mode == '': return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]] elif mode == 'REPEATED': return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]] - elif mode is None or mode == '': + elif mode == 'REQUIRED': return BIG_QUERY_TO_PYTHON_TYPES[field] else: raise ValueError(f"Encountered an unsupported mode: {mode!r}") @@ -94,11 +98,13 @@ def convert_to_usertype(table_schema): class BeamSchemaConversionDoFn(beam.DoFn): - # Converting a dictionary of tuples to a usertype. def __init__(self, pcoll_val_ctor): self._pcoll_val_ctor = pcoll_val_ctor def process(self, dict_of_tuples): + for k, v in dict_of_tuples.items(): + if isinstance(v, datetime.datetime): + dict_of_tuples[k] = beam.utils.timestamp.Timestamp.from_utc_datetime(v) yield self._pcoll_val_ctor(**dict_of_tuples) def infer_output_type(self, input_type): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 9187ec27f9dd..dcd548d22d92 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -50,7 +50,7 @@ def test_check_schema_conversions(self): { 'stn': typing.Optional[str], 'temp': typing.Sequence[np.float64], - 'count': np.int64 + 'count': typing.Optional[np.int64] }) def test_check_conversion_with_empty_schema(self): @@ -61,6 +61,25 @@ def test_check_conversion_with_empty_schema(self): the_table_schema=schema) self.assertEqual(usertype.__annotations__, {}) + def test_check_schema_conversions_with_timestamp(self): + fields = [ + bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"), + bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"), + bigquery.TableFieldSchema( + name='times', type='TIMESTAMP', mode="NULLABLE") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema( + the_table_schema=schema) + self.assertEqual( + usertype.__annotations__, + { + 'stn': typing.Optional[str], + 'temp': typing.Sequence[np.float64], + 'times': typing.Optional[apache_beam.utils.timestamp.Timestamp] + }) + def test_unsupported_type(self): fields = [ bigquery.TableFieldSchema(