diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index ee0cbd714a2d..278ae2d98a60 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -403,10 +403,10 @@ def process(self, element, schema_mod_job_name_prefix): location=temp_table_load_job_reference.location) temp_table_schema = temp_table_load_job.configuration.load.schema - # FIXME: This short-circuit lacks specificity. Schemas differing only in - # the order of fields are not equivalent according to == but do not - # need a schema modification job to precede the copy job. - if temp_table_schema == destination_table.schema: + if bigquery_tools.check_schema_equal(temp_table_schema, + destination_table.schema, + ignore_descriptions=True, + ignore_field_order=True): # Destination table schema is already the same as the temp table schema, # so no need to run a job to update the destination table schema. return diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index cad6831973bb..041af360b027 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -39,6 +39,7 @@ import uuid from builtins import object from json.decoder import JSONDecodeError +from typing import Union import fastavro from future.utils import iteritems @@ -1652,3 +1653,75 @@ def generate_bq_job_name(job_name, step_id, job_type, random=None): job_id=job_name.replace("-", ""), step_id=step_id, random=random) + + +def check_schema_equal( + left, right, *, ignore_descriptions=False, ignore_field_order=False): + # type: (Union[bigquery.TableSchema, bigquery.TableFieldSchema], Union[bigquery.TableSchema, bigquery.TableFieldSchema], bool, bool) -> bool + + """Check whether schemas are equivalent. + + This comparison function differs from using == to compare TableSchema + because it ignores categories, policy tags, descriptions (optionally), and + field ordering (optionally). + + Args: + left (~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema, ~apache_beam.io.gcp.internal.clients.\ +bigquery.bigquery_v2_messages.TableFieldSchema): + One schema to compare. + right (~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema, ~apache_beam.io.gcp.internal.clients.\ +bigquery.bigquery_v2_messages.TableFieldSchema): + The other schema to compare. + ignore_descriptions (bool): (optional) Whether or not to ignore field + descriptions when comparing. Defaults to False. + ignore_field_order (bool): (optional) Whether or not to ignore struct field + order when comparing. Defaults to False. + + Returns: + bool: True if the schemas are equivalent, False otherwise. + """ + if type(left) != type(right) or not isinstance( + left, (bigquery.TableSchema, bigquery.TableFieldSchema)): + return False + + if isinstance(left, bigquery.TableFieldSchema): + if left.name != right.name: + return False + + if left.type != right.type: + # Check for type aliases + if sorted( + (left.type, right.type)) not in (["BOOL", "BOOLEAN"], ["FLOAT", + "FLOAT64"], + ["INT64", "INTEGER"], ["RECORD", + "STRUCT"]): + return False + + if left.mode != right.mode: + return False + + if not ignore_descriptions and left.description != right.description: + return False + + if isinstance(left, + bigquery.TableSchema) or left.type in ("RECORD", "STRUCT"): + if len(left.fields) != len(right.fields): + return False + + if ignore_field_order: + left_fields = sorted(left.fields, key=lambda field: field.name) + right_fields = sorted(right.fields, key=lambda field: field.name) + else: + left_fields = left.fields + right_fields = right.fields + + for left_field, right_field in zip(left_fields, right_fields): + if not check_schema_equal(left_field, + right_field, + ignore_descriptions=ignore_descriptions, + ignore_field_order=ignore_field_order): + return False + + return True diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 558c8738dd44..ac5fdb726031 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -44,6 +44,7 @@ from apache_beam.io.gcp.bigquery_tools import BigQueryJobTypes from apache_beam.io.gcp.bigquery_tools import JsonRowWriter from apache_beam.io.gcp.bigquery_tools import RowAsDictJsonCoder +from apache_beam.io.gcp.bigquery_tools import check_schema_equal from apache_beam.io.gcp.bigquery_tools import generate_bq_job_name from apache_beam.io.gcp.bigquery_tools import parse_table_reference from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json @@ -1013,6 +1014,88 @@ def test_matches_template(self): self.assertRegex(job_name, base_pattern) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class TestCheckSchemaEqual(unittest.TestCase): + def test_simple_schemas(self): + schema1 = bigquery.TableSchema(fields=[]) + self.assertTrue(check_schema_equal(schema1, schema1)) + + schema2 = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema(name="a", mode="NULLABLE", type="INT64") + ]) + self.assertTrue(check_schema_equal(schema2, schema2)) + self.assertFalse(check_schema_equal(schema1, schema2)) + + schema3 = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name="b", + mode="REPEATED", + type="RECORD", + fields=[ + bigquery.TableFieldSchema( + name="c", mode="REQUIRED", type="BOOL") + ]) + ]) + self.assertTrue(check_schema_equal(schema3, schema3)) + self.assertFalse(check_schema_equal(schema2, schema3)) + + def test_field_order(self): + """Test that field order is ignored when ignore_field_order=True.""" + schema1 = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name="a", mode="REQUIRED", type="FLOAT64"), + bigquery.TableFieldSchema(name="b", mode="REQUIRED", type="INT64"), + ]) + + schema2 = bigquery.TableSchema(fields=list(reversed(schema1.fields))) + + self.assertFalse(check_schema_equal(schema1, schema2)) + self.assertTrue( + check_schema_equal(schema1, schema2, ignore_field_order=True)) + + def test_descriptions(self): + """ + Test that differences in description are ignored + when ignore_descriptions=True. + """ + schema1 = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name="a", + mode="REQUIRED", + type="FLOAT64", + description="Field A", + ), + bigquery.TableFieldSchema( + name="b", + mode="REQUIRED", + type="INT64", + ), + ]) + + schema2 = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name="a", + mode="REQUIRED", + type="FLOAT64", + description="Field A is for Apple"), + bigquery.TableFieldSchema( + name="b", + mode="REQUIRED", + type="INT64", + description="Field B", + ), + ]) + + self.assertFalse(check_schema_equal(schema1, schema2)) + self.assertTrue( + check_schema_equal(schema1, schema2, ignore_descriptions=True)) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh index 55e2ddc2a51d..14b711624838 100755 --- a/sdks/python/scripts/generate_pydoc.sh +++ b/sdks/python/scripts/generate_pydoc.sh @@ -154,6 +154,8 @@ ignore_identifiers = [ 'apache_beam.io.gcp.bigquery.RowAsDictJsonCoder', 'apache_beam.io.gcp.datastore.v1new.datastoreio._Mutate', 'apache_beam.io.gcp.datastore.v1new.datastoreio.DatastoreMutateFn', + 'apache_beam.io.gcp.internal.clients.bigquery.' + 'bigquery_v2_messages.TableFieldSchema', 'apache_beam.io.gcp.internal.clients.bigquery.' 'bigquery_v2_messages.TableSchema', 'apache_beam.io.iobase.SourceBase',