Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 67 additions & 18 deletions sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.utils.timestamp import Timestamp

# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
Expand Down Expand Up @@ -185,7 +186,7 @@ def test_table_schema_retrieve(self):
the_table = bigquery_tools.BigQueryWrapper().get_table(
project_id="apache-beam-testing",
dataset_id="beam_bigquery_io_test",
table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
table_id="table_schema_retrieve")
table = the_table.schema
utype = bigquery_schema_tools.\
generate_user_type_from_bq_schema(table)
Expand All @@ -194,24 +195,40 @@ def test_table_schema_retrieve(self):
p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
gcs_location="gs://bqio_schema_test",
dataset="beam_bigquery_io_test",
table="dfsqltable_3c7d6fd5_16e0460dfd0",
table="table_schema_retrieve",
project="apache-beam-testing",
output_type='BEAM_ROW'))
assert_that(
result,
equal_to([
utype(id=3, name='customer1', type='test'),
utype(id=1, name='customer1', type='test'),
utype(id=2, name='customer2', type='test'),
utype(id=4, name='customer2', type='test')
utype(
id=1,
name='customer1',
type='test',
times=Timestamp(1633262400)),
utype(
id=3,
name='customer1',
type='test',
times=Timestamp(1664798400)),
utype(
id=2,
name='customer2',
type='test',
times=Timestamp(1601726400)),
utype(
id=4,
name='customer2',
type='test',
times=Timestamp(1570104000))
]))

@pytest.mark.it_postcommit
def test_table_schema_retrieve_specifying_only_table(self):
the_table = bigquery_tools.BigQueryWrapper().get_table(
project_id="apache-beam-testing",
dataset_id="beam_bigquery_io_test",
table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
table_id="table_schema_retrieve")
table = the_table.schema
utype = bigquery_schema_tools.\
generate_user_type_from_bq_schema(table)
Expand All @@ -221,23 +238,39 @@ def test_table_schema_retrieve_specifying_only_table(self):
gcs_location="gs://bqio_schema_test",
table="apache-beam-testing:"
"beam_bigquery_io_test."
"dfsqltable_3c7d6fd5_16e0460dfd0",
"table_schema_retrieve",
output_type='BEAM_ROW'))
assert_that(
result,
equal_to([
utype(id=3, name='customer1', type='test'),
utype(id=1, name='customer1', type='test'),
utype(id=2, name='customer2', type='test'),
utype(id=4, name='customer2', type='test')
utype(
id=1,
name='customer1',
type='test',
times=Timestamp(1633262400)),
utype(
id=3,
name='customer1',
type='test',
times=Timestamp(1664798400)),
utype(
id=2,
name='customer2',
type='test',
times=Timestamp(1601726400)),
utype(
id=4,
name='customer2',
type='test',
times=Timestamp(1570104000))
]))

@pytest.mark.it_postcommit
def test_table_schema_retrieve_with_direct_read(self):
the_table = bigquery_tools.BigQueryWrapper().get_table(
project_id="apache-beam-testing",
dataset_id="beam_bigquery_io_test",
table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
table_id="table_schema_retrieve")
table = the_table.schema
utype = bigquery_schema_tools.\
generate_user_type_from_bq_schema(table)
Expand All @@ -247,15 +280,31 @@ def test_table_schema_retrieve_with_direct_read(self):
method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
table="apache-beam-testing:"
"beam_bigquery_io_test."
"dfsqltable_3c7d6fd5_16e0460dfd0",
"table_schema_retrieve",
output_type='BEAM_ROW'))
assert_that(
result,
equal_to([
utype(id=3, name='customer1', type='test'),
utype(id=1, name='customer1', type='test'),
utype(id=2, name='customer2', type='test'),
utype(id=4, name='customer2', type='test')
utype(
id=1,
name='customer1',
type='test',
times=Timestamp(1633262400)),
utype(
id=3,
name='customer1',
type='test',
times=Timestamp(1664798400)),
utype(
id=2,
name='customer2',
type='test',
times=Timestamp(1601726400)),
utype(
id=4,
name='customer2',
type='test',
times=Timestamp(1570104000))
]))


Expand Down
12 changes: 9 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
"""

import datetime
from typing import Optional
from typing import Sequence

import numpy as np

import apache_beam as beam
import apache_beam.utils.timestamp
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.portability.api import schema_pb2

Expand All @@ -38,8 +40,10 @@
"STRING": str,
"INTEGER": np.int64,
"FLOAT64": np.float64,
"FLOAT": np.float64,
"BOOLEAN": bool,
"BYTES": bytes,
"TIMESTAMP": apache_beam.utils.timestamp.Timestamp
#TODO(https://github.com/apache/beam/issues/20810):
# Finish mappings for all BQ types
}
Expand Down Expand Up @@ -76,11 +80,11 @@ def generate_user_type_from_bq_schema(the_table_schema):


def bq_field_to_type(field, mode):
if mode == 'NULLABLE':
if mode == 'NULLABLE' or mode is None or mode == '':
return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
elif mode == 'REPEATED':
return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
elif mode is None or mode == '':
elif mode == 'REQUIRED':
return BIG_QUERY_TO_PYTHON_TYPES[field]
else:
raise ValueError(f"Encountered an unsupported mode: {mode!r}")
Expand All @@ -94,11 +98,13 @@ def convert_to_usertype(table_schema):


class BeamSchemaConversionDoFn(beam.DoFn):
# Converting a dictionary of tuples to a usertype.
def __init__(self, pcoll_val_ctor):
self._pcoll_val_ctor = pcoll_val_ctor

def process(self, dict_of_tuples):
for k, v in dict_of_tuples.items():
if isinstance(v, datetime.datetime):
dict_of_tuples[k] = beam.utils.timestamp.Timestamp.from_utc_datetime(v)
yield self._pcoll_val_ctor(**dict_of_tuples)

def infer_output_type(self, input_type):
Expand Down
21 changes: 20 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_check_schema_conversions(self):
{
'stn': typing.Optional[str],
'temp': typing.Sequence[np.float64],
'count': np.int64
'count': typing.Optional[np.int64]
})

def test_check_conversion_with_empty_schema(self):
Expand All @@ -61,6 +61,25 @@ def test_check_conversion_with_empty_schema(self):
the_table_schema=schema)
self.assertEqual(usertype.__annotations__, {})

def test_check_schema_conversions_with_timestamp(self):
fields = [
bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(
name='times', type='TIMESTAMP', mode="NULLABLE")
]
schema = bigquery.TableSchema(fields=fields)

usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
the_table_schema=schema)
self.assertEqual(
usertype.__annotations__,
{
'stn': typing.Optional[str],
'temp': typing.Sequence[np.float64],
'times': typing.Optional[apache_beam.utils.timestamp.Timestamp]
})

def test_unsupported_type(self):
fields = [
bigquery.TableFieldSchema(
Expand Down