From 277cb405dc8dc454ff056ecac3a2e626e2eda0e7 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Wed, 7 Sep 2022 16:39:32 -0400 Subject: [PATCH 01/17] why rowcoder? --- .../io/gcp/bigquery_read_it_test.py | 12 ++++++++++++ .../io/gcp/bigquery_schema_tools.py | 6 ++++-- .../io/gcp/bigquery_schema_tools_test.py | 19 +++++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index b9a414eb76ef..e8c88c3d8cf4 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -258,6 +258,18 @@ def test_table_schema_retrieve_with_direct_read(self): utype(id=4, name='customer2', type='test') ])) + @pytest.mark.it_postcommit + def test_table_schema_retrieve_with_timestamp(self): + with beam.Pipeline(argv=self.args) as p: + result = ( + p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( + gcs_location="gs://bqio_schema_test", + dataset="beam_bigquery_io_test", + table="taxi", + project="apache-beam-testing", + output_type='BEAM_ROW')) + assert_that(result, equal_to([])) + class ReadUsingStorageApiTests(BigQueryReadIntegrationTests): TABLE_DATA = [{ diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index a36c38988cde..b5c1117bb568 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -20,13 +20,13 @@ backwards compatibility guarantees. NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. """ - from typing import Optional from typing import Sequence import numpy as np import apache_beam as beam +import apache_beam.utils.timestamp from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.portability.api import schema_pb2 @@ -38,8 +38,10 @@ "STRING": str, "INTEGER": np.int64, "FLOAT64": np.float64, + "FLOAT": np.float64, "BOOLEAN": bool, "BYTES": bytes, + "TIMESTAMP": apache_beam.utils.timestamp.Timestamp #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } @@ -80,7 +82,7 @@ def bq_field_to_type(field, mode): return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]] elif mode == 'REPEATED': return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]] - elif mode is None or mode == '': + elif mode is None or mode == '' or mode == 'REQUIRED': return BIG_QUERY_TO_PYTHON_TYPES[field] else: raise ValueError(f"Encountered an unsupported mode: {mode!r}") diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 9187ec27f9dd..37ff5787e37e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -61,6 +61,25 @@ def test_check_conversion_with_empty_schema(self): the_table_schema=schema) self.assertEqual(usertype.__annotations__, {}) + def test_check_schema_conversions_with_timestamp(self): + fields = [ + bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"), + bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"), + bigquery.TableFieldSchema( + name='times', type='TIMESTAMP', mode="NULLABLE") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema( + the_table_schema=schema) + self.assertEqual( + usertype.__annotations__, + { + 'stn': typing.Optional[str], + 'temp': typing.Sequence[np.float64], + 'times': typing.Optional[apache_beam.utils.timestamp.Timestamp] + }) + def test_unsupported_type(self): fields = [ bigquery.TableFieldSchema( From a90cef70f2642c550d844743726fb17a636720f9 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Wed, 14 Sep 2022 17:56:57 -0400 Subject: [PATCH 02/17] registering datetime.datetime as schema type --- sdks/python/apache_beam/typehints/schemas.py | 33 ++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 4b312d96ce9f..bca6acce1e79 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -728,6 +728,39 @@ def to_language_type(self, value): return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) +# Make sure MicrosInstant is registered after MillisInstant so that it +# overwrites the mapping of datetime.datetime +# language type representation choice and +# thus does not lose microsecond precision inside python sdk. +@LogicalType.register_logical_type +class MicrosInstantDatetime(NoArgumentLogicalType[datetime.datetime, + MicrosInstantRepresentation]): + """Microsecond-precision instant logical type + that handles ``datetime.datetime``.""" + @classmethod + def urn(cls): + return common_urns.micros_instant.urn + + @classmethod + def representation_type(cls): + # type: () -> type + return MicrosInstantRepresentation + + @classmethod + def language_type(cls): + return datetime.datetime + + def to_representation_type(self, value): + # type: (datetime.datetime) -> MicrosInstantRepresentation + return MicrosInstantRepresentation( + value.microsecond // 1000000, value.microsecond % 1000000) + + def to_language_type(self, value): + # type: (MicrosInstantRepresentation) -> datetime.datetime + return datetime.datetime( + second=int(value.seconds), microsecond=int(value.micros)) + + @LogicalType.register_logical_type class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): """A logical type for PythonCallableSource objects.""" From 509a16491e6c70af5647324f3bf6d4fb0c808e6d Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Wed, 14 Sep 2022 18:05:57 -0400 Subject: [PATCH 03/17] registering datetime.datetime as schema type --- sdks/python/apache_beam/typehints/schemas.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index bca6acce1e79..328179e1281c 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -55,6 +55,7 @@ # pytype: skip-file +from datetime import datetime from typing import Any from typing import ByteString from typing import Dict @@ -733,7 +734,7 @@ def to_language_type(self, value): # language type representation choice and # thus does not lose microsecond precision inside python sdk. @LogicalType.register_logical_type -class MicrosInstantDatetime(NoArgumentLogicalType[datetime.datetime, +class MicrosInstantDatetime(NoArgumentLogicalType[datetime, MicrosInstantRepresentation]): """Microsecond-precision instant logical type that handles ``datetime.datetime``.""" @@ -748,17 +749,16 @@ def representation_type(cls): @classmethod def language_type(cls): - return datetime.datetime + return datetime def to_representation_type(self, value): - # type: (datetime.datetime) -> MicrosInstantRepresentation + # type: (datetime) -> MicrosInstantRepresentation return MicrosInstantRepresentation( value.microsecond // 1000000, value.microsecond % 1000000) def to_language_type(self, value): - # type: (MicrosInstantRepresentation) -> datetime.datetime - return datetime.datetime( - second=int(value.seconds), microsecond=int(value.micros)) + # type: (MicrosInstantRepresentation) -> datetime + return datetime(second=int(value.seconds), microsecond=int(value.micros)) @LogicalType.register_logical_type From 7e4b41754753fb1fb3df5174f3aa6733c3b1424d Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Thu, 15 Sep 2022 15:02:11 -0400 Subject: [PATCH 04/17] registered type in schemas.py --- sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index b5c1117bb568..61ccd715f409 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -20,13 +20,13 @@ backwards compatibility guarantees. NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. """ +import datetime from typing import Optional from typing import Sequence import numpy as np import apache_beam as beam -import apache_beam.utils.timestamp from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.portability.api import schema_pb2 @@ -41,7 +41,7 @@ "FLOAT": np.float64, "BOOLEAN": bool, "BYTES": bytes, - "TIMESTAMP": apache_beam.utils.timestamp.Timestamp + "TIMESTAMP": datetime.datetime #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } From 30b0d99dd9c2f4902eb63c091356073739b9b7c1 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Thu, 15 Sep 2022 15:02:58 -0400 Subject: [PATCH 05/17] registered type in schemas.py --- sdks/python/apache_beam/typehints/schemas.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 328179e1281c..8923cb312429 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -730,14 +730,12 @@ def to_language_type(self, value): # Make sure MicrosInstant is registered after MillisInstant so that it -# overwrites the mapping of datetime.datetime -# language type representation choice and +# overwrites the mapping of Timestamp language type representation choice and # thus does not lose microsecond precision inside python sdk. @LogicalType.register_logical_type class MicrosInstantDatetime(NoArgumentLogicalType[datetime, MicrosInstantRepresentation]): - """Microsecond-precision instant logical type - that handles ``datetime.datetime``.""" + """Microsecond-precision instant logical type that handles ``Timestamp``.""" @classmethod def urn(cls): return common_urns.micros_instant.urn @@ -757,8 +755,8 @@ def to_representation_type(self, value): value.microsecond // 1000000, value.microsecond % 1000000) def to_language_type(self, value): - # type: (MicrosInstantRepresentation) -> datetime - return datetime(second=int(value.seconds), microsecond=int(value.micros)) + # type: (MicrosInstantRepresentation) -> Timestamp + return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) @LogicalType.register_logical_type From 7e0239a5eb57e89d67ac7cbcad5eb3b47a349387 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Thu, 29 Sep 2022 11:39:34 -0400 Subject: [PATCH 06/17] convert Timestamp to datetime.datetime, which will then get converted into apache_beam.utils.timestamp.Timestamp --- .../python/apache_beam/io/gcp/bigquery_schema_tools.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index 61ccd715f409..7d3573df7b7c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -27,6 +27,7 @@ import numpy as np import apache_beam as beam +import apache_beam.utils.timestamp from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.portability.api import schema_pb2 @@ -41,11 +42,14 @@ "FLOAT": np.float64, "BOOLEAN": bool, "BYTES": bytes, - "TIMESTAMP": datetime.datetime + datetime.datetime: apache_beam.utils.timestamp.Timestamp #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } +# Some BQ Types map are equivalent to Beam Types. +BIG_QUERY_TO_BEAM_TYPES = {"TIMESTAMP": datetime.datetime} + def generate_user_type_from_bq_schema(the_table_schema): #type: (bigquery.TableSchema) -> type @@ -62,6 +66,10 @@ def generate_user_type_from_bq_schema(the_table_schema): if the_schema == {}: raise ValueError("Encountered an empty schema") dict_of_tuples = [] + for i in range(len(the_schema['fields'])): + if the_schema['fields'][i]['type'] in BIG_QUERY_TO_BEAM_TYPES: + the_schema['fields'][i]['type'] = BIG_QUERY_TO_BEAM_TYPES[ + the_schema['fields'][i]['type']] for i in range(len(the_schema['fields'])): if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES: typ = bq_field_to_type( From ed3b31798b63881e29f2ff1f39f3cd131692146f Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Thu, 29 Sep 2022 18:47:03 -0400 Subject: [PATCH 07/17] experiment with converting to datetime.datetime --- .../io/gcp/bigquery_schema_tools.py | 15 ++++----- sdks/python/apache_beam/typehints/schemas.py | 31 ------------------- 2 files changed, 8 insertions(+), 38 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index 7d3573df7b7c..2077d6e0e07d 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -27,7 +27,7 @@ import numpy as np import apache_beam as beam -import apache_beam.utils.timestamp +# import apache_beam.utils.timestamp from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.portability.api import schema_pb2 @@ -42,13 +42,13 @@ "FLOAT": np.float64, "BOOLEAN": bool, "BYTES": bytes, - datetime.datetime: apache_beam.utils.timestamp.Timestamp + "TIMESTAMP": datetime.datetime #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } # Some BQ Types map are equivalent to Beam Types. -BIG_QUERY_TO_BEAM_TYPES = {"TIMESTAMP": datetime.datetime} +#BIG_QUERY_TO_BEAM_TYPES = {"TIMESTAMP": datetime.datetime} def generate_user_type_from_bq_schema(the_table_schema): @@ -66,10 +66,10 @@ def generate_user_type_from_bq_schema(the_table_schema): if the_schema == {}: raise ValueError("Encountered an empty schema") dict_of_tuples = [] - for i in range(len(the_schema['fields'])): - if the_schema['fields'][i]['type'] in BIG_QUERY_TO_BEAM_TYPES: - the_schema['fields'][i]['type'] = BIG_QUERY_TO_BEAM_TYPES[ - the_schema['fields'][i]['type']] + #for i in range(len(the_schema['fields'])): + #if the_schema['fields'][i]['type'] in BIG_QUERY_TO_BEAM_TYPES: + #the_schema['fields'][i]['type'] = BIG_QUERY_TO_BEAM_TYPES[ + #the_schema['fields'][i]['type']] for i in range(len(the_schema['fields'])): if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES: typ = bq_field_to_type( @@ -80,6 +80,7 @@ def generate_user_type_from_bq_schema(the_table_schema): f"an unsupported type: {the_schema['fields'][i]['type']!r}") # TODO svetaksundhar@: Map remaining BQ types dict_of_tuples.append((the_schema['fields'][i]['name'], typ)) + #Timestamp here sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples) usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema) return usertype diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 8923cb312429..4b312d96ce9f 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -55,7 +55,6 @@ # pytype: skip-file -from datetime import datetime from typing import Any from typing import ByteString from typing import Dict @@ -729,36 +728,6 @@ def to_language_type(self, value): return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) -# Make sure MicrosInstant is registered after MillisInstant so that it -# overwrites the mapping of Timestamp language type representation choice and -# thus does not lose microsecond precision inside python sdk. -@LogicalType.register_logical_type -class MicrosInstantDatetime(NoArgumentLogicalType[datetime, - MicrosInstantRepresentation]): - """Microsecond-precision instant logical type that handles ``Timestamp``.""" - @classmethod - def urn(cls): - return common_urns.micros_instant.urn - - @classmethod - def representation_type(cls): - # type: () -> type - return MicrosInstantRepresentation - - @classmethod - def language_type(cls): - return datetime - - def to_representation_type(self, value): - # type: (datetime) -> MicrosInstantRepresentation - return MicrosInstantRepresentation( - value.microsecond // 1000000, value.microsecond % 1000000) - - def to_language_type(self, value): - # type: (MicrosInstantRepresentation) -> Timestamp - return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) - - @LogicalType.register_logical_type class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): """A logical type for PythonCallableSource objects.""" From 443ac88b769f14f39ab5601a369d465b38230b3d Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Fri, 30 Sep 2022 12:18:08 -0400 Subject: [PATCH 08/17] Timestamp to datetime.datetime mapping --- .../io/gcp/bigquery_read_it_test.py | 109 +++++++++++++++++- 1 file changed, 107 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index e8c88c3d8cf4..5accfa6c41fc 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -260,15 +260,120 @@ def test_table_schema_retrieve_with_direct_read(self): @pytest.mark.it_postcommit def test_table_schema_retrieve_with_timestamp(self): + the_table = bigquery_tools.BigQueryWrapper().get_table( + project_id="apache-beam-testing", + dataset_id="beam_bigquery_io_test", + table_id="taxi_small") + table = the_table.schema + utype = bigquery_schema_tools. \ + generate_user_type_from_bq_schema(table) with beam.Pipeline(argv=self.args) as p: result = ( p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( gcs_location="gs://bqio_schema_test", dataset="beam_bigquery_io_test", - table="taxi", + table="taxi_small", project="apache-beam-testing", output_type='BEAM_ROW')) - assert_that(result, equal_to([])) + assert_that( + result, + equal_to([ + utype( + event_timestamp=datetime.datetime( + 2021, + 11, + 5, + 20, + 57, + 43, + 612000, + tzinfo=datetime.timezone.utc), + ride_id='0004b5de-8db1-425b-8eec-7a25b04a1ee0', + point_idx=9, + latitude=40.757380000000005, + timestamp='2021-11-05T16:57:43.60906-04:00', + meter_reading=0.37846154, + meter_increment=0.04205128, + ride_status='enroute', + passenger_count=1, + longitude=-73.98969000000001), + utype( + event_timestamp=datetime.datetime( + 2021, + 11, + 5, + 20, + 57, + 36, + 214000, + tzinfo=datetime.timezone.utc), + ride_id='0004b5de-8db1-425b-8eec-7a25b04a1ee0', + point_idx=6, + latitude=40.757130000000004, + timestamp='2021-11-05T16:57:34.92445-04:00', + meter_reading=0.25230768, + meter_increment=0.04205128, + ride_status='enroute', + passenger_count=1, + longitude=-73.98987000000001), + utype( + event_timestamp=datetime.datetime( + 2021, + 11, + 5, + 20, + 52, + 37, + 244000, + tzinfo=datetime.timezone.utc), + ride_id='00101404-c323-4eb6-9f74-f23f3317d905', + point_idx=531, + latitude=40.79684, + timestamp='2021-11-05T16:52:37.23995-04:00', + meter_reading=11.999221, + meter_increment=0.022597402, + ride_status='enroute', + passenger_count=1, + longitude=-73.92925000000001), + utype( + event_timestamp=datetime.datetime( + 2021, + 11, + 5, + 20, + 57, + 19, + 749000, + tzinfo=datetime.timezone.utc), + ride_id='00101404-c323-4eb6-9f74-f23f3317d905', + point_idx=702, + latitude=40.794500000000006, + timestamp='2021-11-05T16:55:52.61301-04:00', + meter_reading=15.863377, + meter_increment=0.022597402, + ride_status='enroute', + passenger_count=1, + longitude=-73.92292), + utype( + event_timestamp=datetime.datetime( + 2021, + 11, + 5, + 20, + 57, + 26, + 957000, + tzinfo=datetime.timezone.utc), + ride_id='00101404-c323-4eb6-9f74-f23f3317d905', + point_idx=714, + latitude=40.79345, + timestamp='2021-11-05T16:56:06.3234-04:00', + meter_reading=16.134544, + meter_increment=0.022597402, + ride_status='enroute', + passenger_count=1, + longitude=-73.92358) + ])) class ReadUsingStorageApiTests(BigQueryReadIntegrationTests): From 1c599866a09e069f10a8f3c0f53e40790db992f1 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Fri, 30 Sep 2022 13:00:03 -0400 Subject: [PATCH 09/17] Timestamp to datetime.datetime mapping --- .../apache_beam/io/gcp/bigquery_schema_tools.py | 12 ++---------- .../apache_beam/io/gcp/bigquery_schema_tools_test.py | 3 +-- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index 2077d6e0e07d..5f3ca6098b05 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -47,9 +47,6 @@ # Finish mappings for all BQ types } -# Some BQ Types map are equivalent to Beam Types. -#BIG_QUERY_TO_BEAM_TYPES = {"TIMESTAMP": datetime.datetime} - def generate_user_type_from_bq_schema(the_table_schema): #type: (bigquery.TableSchema) -> type @@ -66,10 +63,6 @@ def generate_user_type_from_bq_schema(the_table_schema): if the_schema == {}: raise ValueError("Encountered an empty schema") dict_of_tuples = [] - #for i in range(len(the_schema['fields'])): - #if the_schema['fields'][i]['type'] in BIG_QUERY_TO_BEAM_TYPES: - #the_schema['fields'][i]['type'] = BIG_QUERY_TO_BEAM_TYPES[ - #the_schema['fields'][i]['type']] for i in range(len(the_schema['fields'])): if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES: typ = bq_field_to_type( @@ -80,18 +73,17 @@ def generate_user_type_from_bq_schema(the_table_schema): f"an unsupported type: {the_schema['fields'][i]['type']!r}") # TODO svetaksundhar@: Map remaining BQ types dict_of_tuples.append((the_schema['fields'][i]['name'], typ)) - #Timestamp here sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples) usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema) return usertype def bq_field_to_type(field, mode): - if mode == 'NULLABLE': + if mode == 'NULLABLE' or mode is None or mode == '': return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]] elif mode == 'REPEATED': return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]] - elif mode is None or mode == '' or mode == 'REQUIRED': + elif mode == 'REQUIRED': return BIG_QUERY_TO_PYTHON_TYPES[field] else: raise ValueError(f"Encountered an unsupported mode: {mode!r}") diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 37ff5787e37e..d77e132543ca 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import typing import unittest.mock @@ -77,7 +76,7 @@ def test_check_schema_conversions_with_timestamp(self): { 'stn': typing.Optional[str], 'temp': typing.Sequence[np.float64], - 'times': typing.Optional[apache_beam.utils.timestamp.Timestamp] + 'times': typing.Any }) def test_unsupported_type(self): From 7f15ed8dd89abe66aebedffee4c66d87ac2bf7ee Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Fri, 30 Sep 2022 13:01:34 -0400 Subject: [PATCH 10/17] Timestamp to datetime.datetime mapping --- sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py | 1 - sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index 5f3ca6098b05..d9bc96eaff2c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -27,7 +27,6 @@ import numpy as np import apache_beam as beam -# import apache_beam.utils.timestamp from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.portability.api import schema_pb2 diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index d77e132543ca..7a10ae9d3bb3 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import logging import typing import unittest.mock From aa694470ea3c33395166b91a2cfe148f4cda59c7 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Fri, 30 Sep 2022 15:06:20 -0400 Subject: [PATCH 11/17] np fix --- sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 7a10ae9d3bb3..4144ddeecad4 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -50,7 +50,7 @@ def test_check_schema_conversions(self): { 'stn': typing.Optional[str], 'temp': typing.Sequence[np.float64], - 'count': np.int64 + 'count': typing.Optional[np.int64] }) def test_check_conversion_with_empty_schema(self): From 6881243eec5b307dbb73b0905114547f463b1506 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Mon, 3 Oct 2022 15:44:50 -0400 Subject: [PATCH 12/17] apache_beam_utils.timestamp.Timestamp obj --- sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py | 6 +++--- sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index 5accfa6c41fc..9da6fc8ddf87 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -237,7 +237,7 @@ def test_table_schema_retrieve_with_direct_read(self): the_table = bigquery_tools.BigQueryWrapper().get_table( project_id="apache-beam-testing", dataset_id="beam_bigquery_io_test", - table_id="dfsqltable_3c7d6fd5_16e0460dfd0") + table_id="taxi") table = the_table.schema utype = bigquery_schema_tools.\ generate_user_type_from_bq_schema(table) @@ -264,9 +264,9 @@ def test_table_schema_retrieve_with_timestamp(self): project_id="apache-beam-testing", dataset_id="beam_bigquery_io_test", table_id="taxi_small") - table = the_table.schema + schema = the_table.schema utype = bigquery_schema_tools. \ - generate_user_type_from_bq_schema(table) + generate_user_type_from_bq_schema(schema) with beam.Pipeline(argv=self.args) as p: result = ( p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index d9bc96eaff2c..ceb5dbc0d769 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -20,13 +20,14 @@ backwards compatibility guarantees. NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. """ -import datetime + from typing import Optional from typing import Sequence import numpy as np import apache_beam as beam +import apache_beam.utils.timestamp from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.portability.api import schema_pb2 @@ -41,7 +42,7 @@ "FLOAT": np.float64, "BOOLEAN": bool, "BYTES": bytes, - "TIMESTAMP": datetime.datetime + "TIMESTAMP": apache_beam.utils.timestamp.Timestamp.from_utc_datetime #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } From 2c0566fc6489cff53f9e4066e9f74fc01ca356b8 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Mon, 3 Oct 2022 16:08:39 -0400 Subject: [PATCH 13/17] apache_beam_utils.timestamp.Timestamp obj --- sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index 9da6fc8ddf87..6bb277954bb9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -237,7 +237,7 @@ def test_table_schema_retrieve_with_direct_read(self): the_table = bigquery_tools.BigQueryWrapper().get_table( project_id="apache-beam-testing", dataset_id="beam_bigquery_io_test", - table_id="taxi") + table_id="dfsqltable_3c7d6fd5_16e0460dfd0") table = the_table.schema utype = bigquery_schema_tools.\ generate_user_type_from_bq_schema(table) From 803bd2fc8e062a48fbe71d9a9c1c5a2280a34317 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Mon, 3 Oct 2022 18:07:41 -0400 Subject: [PATCH 14/17] fixed tests --- .../io/gcp/bigquery_read_it_test.py | 205 +++++++----------- 1 file changed, 74 insertions(+), 131 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index 6bb277954bb9..bd5defa2bad6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -185,7 +185,7 @@ def test_table_schema_retrieve(self): the_table = bigquery_tools.BigQueryWrapper().get_table( project_id="apache-beam-testing", dataset_id="beam_bigquery_io_test", - table_id="dfsqltable_3c7d6fd5_16e0460dfd0") + table_id="table_schema_retrieve") table = the_table.schema utype = bigquery_schema_tools.\ generate_user_type_from_bq_schema(table) @@ -194,16 +194,36 @@ def test_table_schema_retrieve(self): p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( gcs_location="gs://bqio_schema_test", dataset="beam_bigquery_io_test", - table="dfsqltable_3c7d6fd5_16e0460dfd0", + table="table_schema_retrieve", project="apache-beam-testing", output_type='BEAM_ROW')) assert_that( result, equal_to([ - utype(id=3, name='customer1', type='test'), - utype(id=1, name='customer1', type='test'), - utype(id=2, name='customer2', type='test'), - utype(id=4, name='customer2', type='test') + utype( + id=1, + name='customer1', + type='test', + times=datetime.datetime( + 2021, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + utype( + id=3, + name='customer1', + type='test', + times=datetime.datetime( + 2022, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + utype( + id=2, + name='customer2', + type='test', + times=datetime.datetime( + 2020, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + utype( + id=4, + name='customer2', + type='test', + times=datetime.datetime( + 2019, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)) ])) @pytest.mark.it_postcommit @@ -211,7 +231,7 @@ def test_table_schema_retrieve_specifying_only_table(self): the_table = bigquery_tools.BigQueryWrapper().get_table( project_id="apache-beam-testing", dataset_id="beam_bigquery_io_test", - table_id="dfsqltable_3c7d6fd5_16e0460dfd0") + table_id="table_schema_retrieve") table = the_table.schema utype = bigquery_schema_tools.\ generate_user_type_from_bq_schema(table) @@ -221,15 +241,35 @@ def test_table_schema_retrieve_specifying_only_table(self): gcs_location="gs://bqio_schema_test", table="apache-beam-testing:" "beam_bigquery_io_test." - "dfsqltable_3c7d6fd5_16e0460dfd0", + "table_schema_retrieve", output_type='BEAM_ROW')) assert_that( result, equal_to([ - utype(id=3, name='customer1', type='test'), - utype(id=1, name='customer1', type='test'), - utype(id=2, name='customer2', type='test'), - utype(id=4, name='customer2', type='test') + utype( + id=1, + name='customer1', + type='test', + times=datetime.datetime( + 2021, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + utype( + id=3, + name='customer1', + type='test', + times=datetime.datetime( + 2022, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + utype( + id=2, + name='customer2', + type='test', + times=datetime.datetime( + 2020, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + utype( + id=4, + name='customer2', + type='test', + times=datetime.datetime( + 2019, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)) ])) @pytest.mark.it_postcommit @@ -237,7 +277,7 @@ def test_table_schema_retrieve_with_direct_read(self): the_table = bigquery_tools.BigQueryWrapper().get_table( project_id="apache-beam-testing", dataset_id="beam_bigquery_io_test", - table_id="dfsqltable_3c7d6fd5_16e0460dfd0") + table_id="table_schema_retrieve") table = the_table.schema utype = bigquery_schema_tools.\ generate_user_type_from_bq_schema(table) @@ -247,132 +287,35 @@ def test_table_schema_retrieve_with_direct_read(self): method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, table="apache-beam-testing:" "beam_bigquery_io_test." - "dfsqltable_3c7d6fd5_16e0460dfd0", + "table_schema_retrieve", output_type='BEAM_ROW')) assert_that( result, equal_to([ - utype(id=3, name='customer1', type='test'), - utype(id=1, name='customer1', type='test'), - utype(id=2, name='customer2', type='test'), - utype(id=4, name='customer2', type='test') - ])) - - @pytest.mark.it_postcommit - def test_table_schema_retrieve_with_timestamp(self): - the_table = bigquery_tools.BigQueryWrapper().get_table( - project_id="apache-beam-testing", - dataset_id="beam_bigquery_io_test", - table_id="taxi_small") - schema = the_table.schema - utype = bigquery_schema_tools. \ - generate_user_type_from_bq_schema(schema) - with beam.Pipeline(argv=self.args) as p: - result = ( - p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( - gcs_location="gs://bqio_schema_test", - dataset="beam_bigquery_io_test", - table="taxi_small", - project="apache-beam-testing", - output_type='BEAM_ROW')) - assert_that( - result, - equal_to([ - utype( - event_timestamp=datetime.datetime( - 2021, - 11, - 5, - 20, - 57, - 43, - 612000, - tzinfo=datetime.timezone.utc), - ride_id='0004b5de-8db1-425b-8eec-7a25b04a1ee0', - point_idx=9, - latitude=40.757380000000005, - timestamp='2021-11-05T16:57:43.60906-04:00', - meter_reading=0.37846154, - meter_increment=0.04205128, - ride_status='enroute', - passenger_count=1, - longitude=-73.98969000000001), utype( - event_timestamp=datetime.datetime( - 2021, - 11, - 5, - 20, - 57, - 36, - 214000, - tzinfo=datetime.timezone.utc), - ride_id='0004b5de-8db1-425b-8eec-7a25b04a1ee0', - point_idx=6, - latitude=40.757130000000004, - timestamp='2021-11-05T16:57:34.92445-04:00', - meter_reading=0.25230768, - meter_increment=0.04205128, - ride_status='enroute', - passenger_count=1, - longitude=-73.98987000000001), + id=1, + name='customer1', + type='test', + times=datetime.datetime( + 2021, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), utype( - event_timestamp=datetime.datetime( - 2021, - 11, - 5, - 20, - 52, - 37, - 244000, - tzinfo=datetime.timezone.utc), - ride_id='00101404-c323-4eb6-9f74-f23f3317d905', - point_idx=531, - latitude=40.79684, - timestamp='2021-11-05T16:52:37.23995-04:00', - meter_reading=11.999221, - meter_increment=0.022597402, - ride_status='enroute', - passenger_count=1, - longitude=-73.92925000000001), + id=3, + name='customer1', + type='test', + times=datetime.datetime( + 2022, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), utype( - event_timestamp=datetime.datetime( - 2021, - 11, - 5, - 20, - 57, - 19, - 749000, - tzinfo=datetime.timezone.utc), - ride_id='00101404-c323-4eb6-9f74-f23f3317d905', - point_idx=702, - latitude=40.794500000000006, - timestamp='2021-11-05T16:55:52.61301-04:00', - meter_reading=15.863377, - meter_increment=0.022597402, - ride_status='enroute', - passenger_count=1, - longitude=-73.92292), + id=2, + name='customer2', + type='test', + times=datetime.datetime( + 2020, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), utype( - event_timestamp=datetime.datetime( - 2021, - 11, - 5, - 20, - 57, - 26, - 957000, - tzinfo=datetime.timezone.utc), - ride_id='00101404-c323-4eb6-9f74-f23f3317d905', - point_idx=714, - latitude=40.79345, - timestamp='2021-11-05T16:56:06.3234-04:00', - meter_reading=16.134544, - meter_increment=0.022597402, - ride_status='enroute', - passenger_count=1, - longitude=-73.92358) + id=4, + name='customer2', + type='test', + times=datetime.datetime( + 2019, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)) ])) From 72ca4ca46658b202374e1a48d58b41c301044401 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Tue, 18 Oct 2022 17:33:24 -0400 Subject: [PATCH 15/17] Timestamp conversion --- .../io/gcp/bigquery_read_it_test.py | 37 +++++++------------ .../io/gcp/bigquery_schema_tools.py | 6 ++- .../io/gcp/bigquery_schema_tools_test.py | 2 +- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index bd5defa2bad6..c36c3a28e30e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -44,6 +44,7 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.utils.timestamp import Timestamp # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -204,26 +205,22 @@ def test_table_schema_retrieve(self): id=1, name='customer1', type='test', - times=datetime.datetime( - 2021, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + times=Timestamp(1633262400)), utype( id=3, name='customer1', type='test', - times=datetime.datetime( - 2022, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + times=Timestamp(1664798400)), utype( id=2, name='customer2', type='test', - times=datetime.datetime( - 2020, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + times=Timestamp(1601726400)), utype( id=4, name='customer2', type='test', - times=datetime.datetime( - 2019, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)) + times=Timestamp(1570104000)) ])) @pytest.mark.it_postcommit @@ -250,26 +247,22 @@ def test_table_schema_retrieve_specifying_only_table(self): id=1, name='customer1', type='test', - times=datetime.datetime( - 2021, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + times=Timestamp(1633262400)), utype( id=3, name='customer1', type='test', - times=datetime.datetime( - 2022, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + times=Timestamp(1664798400)), utype( id=2, name='customer2', type='test', - times=datetime.datetime( - 2020, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + times=Timestamp(1601726400)), utype( id=4, name='customer2', type='test', - times=datetime.datetime( - 2019, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)) + times=Timestamp(1570104000)) ])) @pytest.mark.it_postcommit @@ -296,26 +289,22 @@ def test_table_schema_retrieve_with_direct_read(self): id=1, name='customer1', type='test', - times=datetime.datetime( - 2021, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + times=Timestamp(1633262400)), utype( id=3, name='customer1', type='test', - times=datetime.datetime( - 2022, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + times=Timestamp(1664798400)), utype( id=2, name='customer2', type='test', - times=datetime.datetime( - 2020, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)), + times=Timestamp(1601726400)), utype( id=4, name='customer2', type='test', - times=datetime.datetime( - 2019, 10, 3, 12, 0, tzinfo=datetime.timezone.utc)) + times=Timestamp(1570104000)) ])) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index ceb5dbc0d769..7c4c55e94f5a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -23,6 +23,7 @@ from typing import Optional from typing import Sequence +import datetime import numpy as np @@ -42,7 +43,7 @@ "FLOAT": np.float64, "BOOLEAN": bool, "BYTES": bytes, - "TIMESTAMP": apache_beam.utils.timestamp.Timestamp.from_utc_datetime + "TIMESTAMP": apache_beam.utils.timestamp.Timestamp #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } @@ -102,6 +103,9 @@ def __init__(self, pcoll_val_ctor): self._pcoll_val_ctor = pcoll_val_ctor def process(self, dict_of_tuples): + for k, v in dict_of_tuples.items(): + if isinstance(v, datetime.datetime): + dict_of_tuples[k] = beam.utils.timestamp.Timestamp.from_utc_datetime(v) yield self._pcoll_val_ctor(**dict_of_tuples) def infer_output_type(self, input_type): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 4144ddeecad4..dcd548d22d92 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -77,7 +77,7 @@ def test_check_schema_conversions_with_timestamp(self): { 'stn': typing.Optional[str], 'temp': typing.Sequence[np.float64], - 'times': typing.Any + 'times': typing.Optional[apache_beam.utils.timestamp.Timestamp] }) def test_unsupported_type(self): From 04969a7297e1387548c82be2e8aaf83317f4da60 Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Tue, 18 Oct 2022 17:42:51 -0400 Subject: [PATCH 16/17] Timestamp conversion with lint --- sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index 7c4c55e94f5a..d1a137812598 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -21,9 +21,9 @@ NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. """ +import datetime from typing import Optional from typing import Sequence -import datetime import numpy as np From 1b1549111cf98a4891f46e5528eaa8e7e32a6caa Mon Sep 17 00:00:00 2001 From: Svetak Sundhar Date: Tue, 18 Oct 2022 21:22:53 -0400 Subject: [PATCH 17/17] Timestamp conversion with lint --- sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index d1a137812598..e78f7bd5a7f7 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -98,7 +98,6 @@ def convert_to_usertype(table_schema): class BeamSchemaConversionDoFn(beam.DoFn): - # Converting a dictionary of tuples to a usertype. def __init__(self, pcoll_val_ctor): self._pcoll_val_ctor = pcoll_val_ctor