Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ def process(self, element, schema_mod_job_name_prefix):
location=temp_table_load_job_reference.location)
temp_table_schema = temp_table_load_job.configuration.load.schema

# FIXME: This short-circuit lacks specificity. Schemas differing only in
# the order of fields are not equivalent according to == but do not
# need a schema modification job to precede the copy job.
if temp_table_schema == destination_table.schema:
if bigquery_tools.check_schema_equal(temp_table_schema,
destination_table.schema,
ignore_descriptions=True,
ignore_field_order=True):
# Destination table schema is already the same as the temp table schema,
# so no need to run a job to update the destination table schema.
return
Expand Down
73 changes: 73 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import uuid
from builtins import object
from json.decoder import JSONDecodeError
from typing import Union

import fastavro
from future.utils import iteritems
Expand Down Expand Up @@ -1652,3 +1653,75 @@ def generate_bq_job_name(job_name, step_id, job_type, random=None):
job_id=job_name.replace("-", ""),
step_id=step_id,
random=random)


def check_schema_equal(
left, right, *, ignore_descriptions=False, ignore_field_order=False):
# type: (Union[bigquery.TableSchema, bigquery.TableFieldSchema], Union[bigquery.TableSchema, bigquery.TableFieldSchema], bool, bool) -> bool

"""Check whether schemas are equivalent.

This comparison function differs from using == to compare TableSchema
because it ignores categories, policy tags, descriptions (optionally), and
field ordering (optionally).

Args:
left (~apache_beam.io.gcp.internal.clients.bigquery.\
bigquery_v2_messages.TableSchema, ~apache_beam.io.gcp.internal.clients.\
bigquery.bigquery_v2_messages.TableFieldSchema):
One schema to compare.
right (~apache_beam.io.gcp.internal.clients.bigquery.\
bigquery_v2_messages.TableSchema, ~apache_beam.io.gcp.internal.clients.\
bigquery.bigquery_v2_messages.TableFieldSchema):
The other schema to compare.
ignore_descriptions (bool): (optional) Whether or not to ignore field
descriptions when comparing. Defaults to False.
ignore_field_order (bool): (optional) Whether or not to ignore struct field
order when comparing. Defaults to False.

Returns:
bool: True if the schemas are equivalent, False otherwise.
"""
if type(left) != type(right) or not isinstance(
left, (bigquery.TableSchema, bigquery.TableFieldSchema)):
return False

if isinstance(left, bigquery.TableFieldSchema):
if left.name != right.name:
return False

if left.type != right.type:
# Check for type aliases
if sorted(
(left.type, right.type)) not in (["BOOL", "BOOLEAN"], ["FLOAT",
"FLOAT64"],
["INT64", "INTEGER"], ["RECORD",
"STRUCT"]):
return False

if left.mode != right.mode:
return False

if not ignore_descriptions and left.description != right.description:
return False

if isinstance(left,
bigquery.TableSchema) or left.type in ("RECORD", "STRUCT"):
if len(left.fields) != len(right.fields):
return False

if ignore_field_order:
left_fields = sorted(left.fields, key=lambda field: field.name)
right_fields = sorted(right.fields, key=lambda field: field.name)
else:
left_fields = left.fields
right_fields = right.fields

for left_field, right_field in zip(left_fields, right_fields):
if not check_schema_equal(left_field,
right_field,
ignore_descriptions=ignore_descriptions,
ignore_field_order=ignore_field_order):
return False

return True
83 changes: 83 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.io.gcp.bigquery_tools import BigQueryJobTypes
from apache_beam.io.gcp.bigquery_tools import JsonRowWriter
from apache_beam.io.gcp.bigquery_tools import RowAsDictJsonCoder
from apache_beam.io.gcp.bigquery_tools import check_schema_equal
from apache_beam.io.gcp.bigquery_tools import generate_bq_job_name
from apache_beam.io.gcp.bigquery_tools import parse_table_reference
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
Expand Down Expand Up @@ -1013,6 +1014,88 @@ def test_matches_template(self):
self.assertRegex(job_name, base_pattern)


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestCheckSchemaEqual(unittest.TestCase):
def test_simple_schemas(self):
schema1 = bigquery.TableSchema(fields=[])
self.assertTrue(check_schema_equal(schema1, schema1))

schema2 = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(name="a", mode="NULLABLE", type="INT64")
])
self.assertTrue(check_schema_equal(schema2, schema2))
self.assertFalse(check_schema_equal(schema1, schema2))

schema3 = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name="b",
mode="REPEATED",
type="RECORD",
fields=[
bigquery.TableFieldSchema(
name="c", mode="REQUIRED", type="BOOL")
])
])
self.assertTrue(check_schema_equal(schema3, schema3))
self.assertFalse(check_schema_equal(schema2, schema3))

def test_field_order(self):
"""Test that field order is ignored when ignore_field_order=True."""
schema1 = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name="a", mode="REQUIRED", type="FLOAT64"),
bigquery.TableFieldSchema(name="b", mode="REQUIRED", type="INT64"),
])

schema2 = bigquery.TableSchema(fields=list(reversed(schema1.fields)))

self.assertFalse(check_schema_equal(schema1, schema2))
self.assertTrue(
check_schema_equal(schema1, schema2, ignore_field_order=True))

def test_descriptions(self):
"""
Test that differences in description are ignored
when ignore_descriptions=True.
"""
schema1 = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name="a",
mode="REQUIRED",
type="FLOAT64",
description="Field A",
),
bigquery.TableFieldSchema(
name="b",
mode="REQUIRED",
type="INT64",
),
])

schema2 = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name="a",
mode="REQUIRED",
type="FLOAT64",
description="Field A is for Apple"),
bigquery.TableFieldSchema(
name="b",
mode="REQUIRED",
type="INT64",
description="Field B",
),
])

self.assertFalse(check_schema_equal(schema1, schema2))
self.assertTrue(
check_schema_equal(schema1, schema2, ignore_descriptions=True))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
2 changes: 2 additions & 0 deletions sdks/python/scripts/generate_pydoc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ ignore_identifiers = [
'apache_beam.io.gcp.bigquery.RowAsDictJsonCoder',
'apache_beam.io.gcp.datastore.v1new.datastoreio._Mutate',
'apache_beam.io.gcp.datastore.v1new.datastoreio.DatastoreMutateFn',
'apache_beam.io.gcp.internal.clients.bigquery.'
'bigquery_v2_messages.TableFieldSchema',
'apache_beam.io.gcp.internal.clients.bigquery.'
'bigquery_v2_messages.TableSchema',
'apache_beam.io.iobase.SourceBase',
Expand Down