From 6a79549b158ab046484f831b5ac227b34afdf4ef Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 25 Jul 2025 15:17:06 +0000 Subject: [PATCH 01/15] add more test coverage for nullable field for BQ --- .../extended_tests/databases/bigquery.yaml | 79 +++++++++++++++++-- 1 file changed, 71 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery.yaml index f5ab31b3855b..d0357e098bf3 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery.yaml @@ -16,11 +16,20 @@ # fixtures: - - name: BQ_TABLE + - name: BQ_TABLE_0 type: "apache_beam.yaml.integration_tests.temp_bigquery_table" config: project: "apache-beam-testing" - - name: TEMP_DIR + - name: TEMP_DIR_0 + # Need distributed filesystem to be able to read and write from a container. + type: "apache_beam.yaml.integration_tests.gcs_temp_dir" + config: + bucket: "gs://temp-storage-for-end-to-end-tests/temp-it" + - name: BQ_TABLE_1 + type: "apache_beam.yaml.integration_tests.temp_bigquery_table" + config: + project: "apache-beam-testing" + - name: TEMP_DIR_1 # Need distributed filesystem to be able to read and write from a container. type: "apache_beam.yaml.integration_tests.gcs_temp_dir" config: @@ -38,17 +47,17 @@ pipelines: - {label: "389a", rank: 2} - type: WriteToBigQuery config: - table: "{BQ_TABLE}" + table: "{BQ_TABLE_0}" options: project: "apache-beam-testing" - temp_location: "{TEMP_DIR}" + temp_location: "{TEMP_DIR_0}" - pipeline: type: chain transforms: - type: ReadFromBigQuery config: - table: "{BQ_TABLE}" + table: "{BQ_TABLE_0}" - type: AssertEqual config: elements: @@ -57,14 +66,14 @@ pipelines: - {label: "389a", rank: 2} options: project: "apache-beam-testing" - temp_location: "{TEMP_DIR}" + temp_location: "{TEMP_DIR_0}" - pipeline: type: chain transforms: - type: ReadFromBigQuery config: - table: "{BQ_TABLE}" + table: "{BQ_TABLE_0}" fields: ["label"] row_restriction: "rank > 0" - type: AssertEqual @@ -74,4 +83,58 @@ pipelines: - {label: "389a"} options: project: "apache-beam-testing" - temp_location: "{TEMP_DIR}" + temp_location: "{TEMP_DIR_0}" + + # ---------------------------------------------------------------------------- + + # New write to verify row restriction based on Timestamp and nullability + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {label: "4a", rank: 3, timestamp: "2024-07-14 00:00:00 UTC"} + - {label: "5a", rank: 4} + - {label: "6a", rank: 5, timestamp: "2024-07-14T02:00:00.123Z"} + - type: WriteToBigQuery + config: + table: "{BQ_TABLE_1}" + + # New read from BQ to verify row restriction with nullable field and filter + # out nullable record + - pipeline: + type: chain + transforms: + - type: ReadFromBigQuery + config: + table: "{BQ_TABLE_1}" + fields: ["label","rank","timestamp"] + row_restriction: "TIMESTAMP(timestamp) <= TIMESTAMP_SUB('2025-07-14 04:00:00', INTERVAL 4 HOUR)" + - type: AssertEqual + config: + elements: + - {label: "4a", rank: 3, timestamp: "2024-07-14 00:00:00 UTC"} + - {label: "6a", rank: 5,timestamp: "2024-07-14T02:00:00.123Z"} + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR_1}" + + # New read from BQ to verify row restriction with nullable field and keep + # nullable record + - pipeline: + type: chain + transforms: + - type: ReadFromBigQuery + config: + table: "{BQ_TABLE_1}" + fields: ["timestamp", "label", "rank"] + row_restriction: "timestamp is NULL" + - type: AssertEqual + config: + elements: + - {label: "5a", rank: 4} + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR_1}" + From 2a57997ed08443aee03d7cdf85a0b95723dc81bb Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 25 Jul 2025 15:18:42 +0000 Subject: [PATCH 02/15] yapf changes --- sdks/python/apache_beam/transforms/core.py | 44 ++++++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 6e0170c04ea7..a187bf6f93e1 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -72,6 +72,7 @@ from apache_beam.typehints.typehints import visit_inner_types from apache_beam.utils import urns from apache_beam.utils.timestamp import Duration +from collections import defaultdict if typing.TYPE_CHECKING: from google.protobuf import message # pylint: disable=ungrouped-imports @@ -3962,9 +3963,46 @@ def to_runner_api_parameter(self, context): def infer_output_type(self, unused_input_type): if not self.values: return typehints.Any - return typehints.Union[[ - trivial_inference.instance_to_type(v) for v in self.values - ]] + + try: + if not all(isinstance(v, pvalue.Row) for v in self.values): + raise TypeError("All data must be Rows.") + first_fields = self.values[0].as_dict().keys() + if not all(v.as_dict().keys() == first_fields for v in self.values): + raise TypeError("All rows must have the same fields.") + except (TypeError, AttributeError): + # For non-Row or inconsistent rows. + return typehints.Union[[ + trivial_inference.instance_to_type(v) for v in self.values + ]] + + # Save field types for each field + field_types_by_field = defaultdict(set) + for row in self.values: + row_dict = row.as_dict() + for field in first_fields: + field_types_by_field[field].add( + trivial_inference.instance_to_type(row_dict.get(field))) + + # Determine the appropriate type for each field + final_fields = [] + for field in first_fields: + field_types = field_types_by_field[field] + non_none_types = {t for t in field_types if t is not type(None)} + + if len(non_none_types) > 1: + raise TypeError( + "Multiple types found for field %s: %s", field, non_none_types) + elif len(non_none_types) == 1 and len(field_types) == 1: + final_type = non_none_types.pop() + elif len(non_none_types) == 1 and len(field_types) == 2: + final_type = typing.Optional[non_none_types.pop()] + else: + raise TypeError("No types found for row: %s", self.values) + + final_fields.append((field, final_type)) + + return row_type.RowTypeConstraint.from_fields(final_fields) def get_output_type(self): return ( From a1b686219114b0b124d7da083757884b29e0e839 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 25 Jul 2025 15:45:35 +0000 Subject: [PATCH 03/15] change else statement to field types instead of row --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a187bf6f93e1..8faf8ae2cce9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3997,8 +3997,8 @@ def infer_output_type(self, unused_input_type): final_type = non_none_types.pop() elif len(non_none_types) == 1 and len(field_types) == 2: final_type = typing.Optional[non_none_types.pop()] - else: - raise TypeError("No types found for row: %s", self.values) + else: # No available field types + raise TypeError("No types found for field %s", field) final_fields.append((field, final_type)) From 6e754988370f3423da7fbedd213471817a208bd1 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 25 Jul 2025 16:48:34 +0000 Subject: [PATCH 04/15] remove error handling as not needed for now --- .../yaml/tests/assign_timestamps.yaml | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml index edaa581214ea..81c6c311fb5e 100644 --- a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml +++ b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml @@ -49,36 +49,3 @@ pipelines: - {user: "alice", timestamp: 3} - {user: "bob", timestamp: 7} - # Assign timestamp to beam row element with error_handling - - pipeline: - type: composite - transforms: - - type: Create - name: CreateVisits - config: - elements: - - {user: alice, timestamp: "not-valid"} - - {user: bob, timestamp: 3} - - type: AssignTimestamps - input: CreateVisits - config: - timestamp: timestamp - error_handling: - output: invalid_rows - - type: MapToFields - input: AssignTimestamps.invalid_rows - config: - language: python - fields: - user: "element.user" - timestamp: "element.timestamp" - - type: AssertEqual - input: MapToFields - config: - elements: - - {user: "alice", timestamp: "not-valid"} - - type: AssertEqual - input: AssignTimestamps - config: - elements: - - {user: bob, timestamp: 3} From b46336cf9f919d6d479631c5da4ae2ca3473529a Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 25 Jul 2025 16:48:52 +0000 Subject: [PATCH 05/15] fix lint issues --- sdks/python/apache_beam/transforms/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 8faf8ae2cce9..50f455dc1580 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -29,6 +29,7 @@ import traceback import types import typing +from collections import defaultdict from itertools import dropwhile from apache_beam import coders @@ -72,7 +73,6 @@ from apache_beam.typehints.typehints import visit_inner_types from apache_beam.utils import urns from apache_beam.utils.timestamp import Duration -from collections import defaultdict if typing.TYPE_CHECKING: from google.protobuf import message # pylint: disable=ungrouped-imports From 45daa2d06daba124b02e8ee37ae9c3584ab96960 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 25 Jul 2025 17:42:23 +0000 Subject: [PATCH 06/15] comment out failing create transforms --- .../yaml/tests/assign_timestamps.yaml | 35 +++++++ .../yaml/tests/validate_with_schema.yaml | 96 ++++++++++--------- 2 files changed, 84 insertions(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml index 81c6c311fb5e..572fc3de892e 100644 --- a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml +++ b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml @@ -49,3 +49,38 @@ pipelines: - {user: "alice", timestamp: 3} - {user: "bob", timestamp: 7} + # TODO - Rethink error handling since schema mismatch in Create transform + # isn't allowed anymore. + # Assign timestamp to beam row element with error_handling + # - pipeline: + # type: composite + # transforms: + # - type: Create + # name: CreateVisits + # config: + # elements: + # - {user: alice, timestamp: "not-valid"} + # - {user: bob, timestamp: 3} + # - type: AssignTimestamps + # input: CreateVisits + # config: + # timestamp: timestamp + # error_handling: + # output: invalid_rows + # - type: MapToFields + # input: AssignTimestamps.invalid_rows + # config: + # language: python + # fields: + # user: "element.user" + # timestamp: "element.timestamp" + # - type: AssertEqual + # input: MapToFields + # config: + # elements: + # - {user: "alice", timestamp: "not-valid"} + # - type: AssertEqual + # input: AssignTimestamps + # config: + # elements: + # - {user: bob, timestamp: 3} diff --git a/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml index d5ae57a3e8c1..04c744ad1a17 100644 --- a/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml +++ b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml @@ -43,53 +43,55 @@ pipelines: - {name: "Alice", age: 30, score: 95.5} - {name: "Bob", age: 25, score: 88.0} + # TODO - Rethink error handling since schema mismatch in Create transform + # isn't allowed anymore. # Validate a Beam Row with a predefined schema with error handling - - pipeline: - type: composite - transforms: - - type: Create - config: - elements: - - {name: "Alice", age: 30, score: 95.5} - - {name: "Bob", age: 25, score: 88.0} - - {name: "Charlie", age: 27, score: "apple"} - - {name: "David", age: "twenty", score: 90.0} - - {name: 30, age: 40, score: 100.0} - - type: ValidateWithSchema - input: Create - config: - schema: - type: object - properties: - name: - type: string - age: - type: integer - score: - type: number - required: [name, age, score] - error_handling: - output: invalid_rows - - type: MapToFields - input: ValidateWithSchema.invalid_rows - config: - language: python - fields: - name: "element.name" - age: "element.age" - score: "element.score" - - type: AssertEqual - input: MapToFields - config: - elements: - - {name: "Charlie", age: 27, score: "apple"} - - {name: "David", age: "twenty", score: 90.0} - - {name: 30, age: 40, score: 100.0} - - type: AssertEqual - input: ValidateWithSchema - config: - elements: - - {name: "Alice", age: 30, score: 95.5} - - {name: "Bob", age: 25, score: 88.0} + # - pipeline: + # type: composite + # transforms: + # - type: Create + # config: + # elements: + # - {name: "Alice", age: 30, score: 95.5} + # - {name: "Bob", age: 25, score: 88.0} + # - {name: "Charlie", age: 27, score: "apple"} + # - {name: "David", age: "twenty", score: 90.0} + # - {name: 30, age: 40, score: 100.0} + # - type: ValidateWithSchema + # input: Create + # config: + # schema: + # type: object + # properties: + # name: + # type: string + # age: + # type: integer + # score: + # type: number + # required: [name, age, score] + # error_handling: + # output: invalid_rows + # - type: MapToFields + # input: ValidateWithSchema.invalid_rows + # config: + # language: python + # fields: + # name: "element.name" + # age: "element.age" + # score: "element.score" + # - type: AssertEqual + # input: MapToFields + # config: + # elements: + # - {name: "Charlie", age: 27, score: "apple"} + # - {name: "David", age: "twenty", score: 90.0} + # - {name: 30, age: 40, score: 100.0} + # - type: AssertEqual + # input: ValidateWithSchema + # config: + # elements: + # - {name: "Alice", age: 30, score: 95.5} + # - {name: "Bob", age: 25, score: 88.0} From feb4f8efa6ea10e05792612221e68e20be2fec9b Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Sun, 27 Jul 2025 01:08:22 +0000 Subject: [PATCH 07/15] address comments --- sdks/python/apache_beam/transforms/core.py | 2 -- .../python/apache_beam/yaml/tests/create.yaml | 34 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 50f455dc1580..0941d64ac4a0 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3965,8 +3965,6 @@ def infer_output_type(self, unused_input_type): return typehints.Any try: - if not all(isinstance(v, pvalue.Row) for v in self.values): - raise TypeError("All data must be Rows.") first_fields = self.values[0].as_dict().keys() if not all(v.as_dict().keys() == first_fields for v in self.values): raise TypeError("All rows must have the same fields.") diff --git a/sdks/python/apache_beam/yaml/tests/create.yaml b/sdks/python/apache_beam/yaml/tests/create.yaml index 723d8a888c26..30f276671874 100644 --- a/sdks/python/apache_beam/yaml/tests/create.yaml +++ b/sdks/python/apache_beam/yaml/tests/create.yaml @@ -81,3 +81,37 @@ pipelines: - {sdk: MapReduce, year: 2004} - {sdk: Flume} - {sdk: MillWheel, year: 2008} + + # Simple Create with explicit null value + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: Flume, year: null} + - {sdk: MillWheel, year: 2008} + - type: AssertEqual + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: Flume, year: null} + - {sdk: MillWheel, year: 2008} + + # Simple Create with explicit null values for the entire record + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: null, year: null} + - {sdk: MillWheel, year: 2008} + - type: AssertEqual + config: + elements: + - {sdk: MapReduce, year: 2004} + - {} + - {sdk: MillWheel, year: 2008} From 6cb318c7c18b8cc79e14411bfc1afb057b11d3e5 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Sun, 27 Jul 2025 12:10:47 +0000 Subject: [PATCH 08/15] add core create UT --- .../apache_beam/transforms/core_test.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 542544bce3c1..fd29381aa9d7 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -21,6 +21,7 @@ import logging import os import tempfile +import typing import unittest from typing import TypeVar @@ -30,6 +31,7 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms.window import FixedWindows +from apache_beam.typehints import row_type from apache_beam.typehints import TypeCheckError from apache_beam.typehints import typehints @@ -322,6 +324,33 @@ def test_typecheck_with_default(self): | beam.Map(lambda s: s.upper()).with_input_types(str)) +class CreateInferOutputSchemaTest(unittest.TestCase): + def test_multiple_types_for_field_raises_error(self): + with self.assertRaisesRegex( + TypeError, + "('Multiple types found for field %s: %s', 'a', " + + "{, })"): + beam.Create([beam.Row(a=1), beam.Row(a='foo')]).infer_output_type(None) + + def test_single_type_for_field(self): + output_type = beam.Create([beam.Row(a=1), + beam.Row(a=2)]).infer_output_type(None) + self.assertEqual( + output_type, row_type.RowTypeConstraint.from_fields([('a', int)])) + + def test_optional_type_for_field(self): + output_type = beam.Create([beam.Row(a=1), + beam.Row(a=None)]).infer_output_type(None) + self.assertEqual( + output_type, + row_type.RowTypeConstraint.from_fields([('a', typing.Optional[int])])) + + def test_none_type_for_field_raises_error(self): + with self.assertRaisesRegex(TypeError, + "('No types found for field %s', 'a')"): + beam.Create([beam.Row(a=None), beam.Row(a=None)]).infer_output_type(None) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 16960201289539b07d46440a63c09eb4d34c6171 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Sun, 27 Jul 2025 19:32:38 +0000 Subject: [PATCH 09/15] update core logic --- sdks/python/apache_beam/transforms/core.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 0941d64ac4a0..00547c8a5ae9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3964,16 +3964,14 @@ def infer_output_type(self, unused_input_type): if not self.values: return typehints.Any - try: - first_fields = self.values[0].as_dict().keys() - if not all(v.as_dict().keys() == first_fields for v in self.values): - raise TypeError("All rows must have the same fields.") - except (TypeError, AttributeError): - # For non-Row or inconsistent rows. + # No field data - just use default Union. + if not hasattr(self.values[0], 'as_dict'): return typehints.Union[[ trivial_inference.instance_to_type(v) for v in self.values ]] + first_fields = self.values[0].as_dict().keys() + # Save field types for each field field_types_by_field = defaultdict(set) for row in self.values: @@ -3995,7 +3993,7 @@ def infer_output_type(self, unused_input_type): final_type = non_none_types.pop() elif len(non_none_types) == 1 and len(field_types) == 2: final_type = typing.Optional[non_none_types.pop()] - else: # No available field types + else: raise TypeError("No types found for field %s", field) final_fields.append((field, final_type)) From d9705d87facd1d3db291dd91396179cd34636586 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Sun, 27 Jul 2025 20:20:31 +0000 Subject: [PATCH 10/15] fix lint --- sdks/python/apache_beam/transforms/core_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index fd29381aa9d7..405cb5957230 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -31,8 +31,8 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms.window import FixedWindows -from apache_beam.typehints import row_type from apache_beam.typehints import TypeCheckError +from apache_beam.typehints import row_type from apache_beam.typehints import typehints RETURN_NONE_PARTIAL_WARNING = "No iterator is returned" From c1dac3d26e1f24db3c49499af6be765d9d038e10 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Sun, 27 Jul 2025 21:11:43 +0000 Subject: [PATCH 11/15] fix brittle test --- sdks/python/apache_beam/transforms/core_test.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 405cb5957230..8d5b0d13c5b2 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -326,11 +326,12 @@ def test_typecheck_with_default(self): class CreateInferOutputSchemaTest(unittest.TestCase): def test_multiple_types_for_field_raises_error(self): - with self.assertRaisesRegex( - TypeError, - "('Multiple types found for field %s: %s', 'a', " + - "{, })"): + with self.assertRaises(TypeError) as cm: beam.Create([beam.Row(a=1), beam.Row(a='foo')]).infer_output_type(None) + self.assertEqual( + cm.exception.args[0], "Multiple types found for field %s: %s") + self.assertEqual(cm.exception.args[1], 'a') + self.assertEqual(cm.exception.args[2], {int, str}) def test_single_type_for_field(self): output_type = beam.Create([beam.Row(a=1), From e5bb876e0aeb5dbd30222af9098ad7e4c40b8bdb Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 30 Jul 2025 16:54:45 +0000 Subject: [PATCH 12/15] update multiple type logic --- sdks/python/apache_beam/transforms/core.py | 3 +-- sdks/python/apache_beam/transforms/core_test.py | 11 +++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 00547c8a5ae9..bf020883a9da 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3987,8 +3987,7 @@ def infer_output_type(self, unused_input_type): non_none_types = {t for t in field_types if t is not type(None)} if len(non_none_types) > 1: - raise TypeError( - "Multiple types found for field %s: %s", field, non_none_types) + final_type = typing.Union[tuple(non_none_types)] elif len(non_none_types) == 1 and len(field_types) == 1: final_type = non_none_types.pop() elif len(non_none_types) == 1 and len(field_types) == 2: diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 8d5b0d13c5b2..7991c9c25dbc 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -325,13 +325,12 @@ def test_typecheck_with_default(self): class CreateInferOutputSchemaTest(unittest.TestCase): - def test_multiple_types_for_field_raises_error(self): - with self.assertRaises(TypeError) as cm: - beam.Create([beam.Row(a=1), beam.Row(a='foo')]).infer_output_type(None) + def test_multiple_types_for_field(self): + output_type = beam.Create([beam.Row(a=1), + beam.Row(a='foo')]).infer_output_type(None) self.assertEqual( - cm.exception.args[0], "Multiple types found for field %s: %s") - self.assertEqual(cm.exception.args[1], 'a') - self.assertEqual(cm.exception.args[2], {int, str}) + output_type, + row_type.RowTypeConstraint.from_fields([('a', typing.Union[int, str])])) def test_single_type_for_field(self): output_type = beam.Create([beam.Row(a=1), From c12d82b4aae59421b4815fce05361bdc46b64997 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 30 Jul 2025 19:59:54 +0000 Subject: [PATCH 13/15] revert changes to assign timestamps and validate --- .../yaml/tests/assign_timestamps.yaml | 66 +++++++------ .../yaml/tests/validate_with_schema.yaml | 96 +++++++++---------- 2 files changed, 79 insertions(+), 83 deletions(-) diff --git a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml index 572fc3de892e..edaa581214ea 100644 --- a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml +++ b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml @@ -49,38 +49,36 @@ pipelines: - {user: "alice", timestamp: 3} - {user: "bob", timestamp: 7} - # TODO - Rethink error handling since schema mismatch in Create transform - # isn't allowed anymore. # Assign timestamp to beam row element with error_handling - # - pipeline: - # type: composite - # transforms: - # - type: Create - # name: CreateVisits - # config: - # elements: - # - {user: alice, timestamp: "not-valid"} - # - {user: bob, timestamp: 3} - # - type: AssignTimestamps - # input: CreateVisits - # config: - # timestamp: timestamp - # error_handling: - # output: invalid_rows - # - type: MapToFields - # input: AssignTimestamps.invalid_rows - # config: - # language: python - # fields: - # user: "element.user" - # timestamp: "element.timestamp" - # - type: AssertEqual - # input: MapToFields - # config: - # elements: - # - {user: "alice", timestamp: "not-valid"} - # - type: AssertEqual - # input: AssignTimestamps - # config: - # elements: - # - {user: bob, timestamp: 3} + - pipeline: + type: composite + transforms: + - type: Create + name: CreateVisits + config: + elements: + - {user: alice, timestamp: "not-valid"} + - {user: bob, timestamp: 3} + - type: AssignTimestamps + input: CreateVisits + config: + timestamp: timestamp + error_handling: + output: invalid_rows + - type: MapToFields + input: AssignTimestamps.invalid_rows + config: + language: python + fields: + user: "element.user" + timestamp: "element.timestamp" + - type: AssertEqual + input: MapToFields + config: + elements: + - {user: "alice", timestamp: "not-valid"} + - type: AssertEqual + input: AssignTimestamps + config: + elements: + - {user: bob, timestamp: 3} diff --git a/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml index 04c744ad1a17..d5ae57a3e8c1 100644 --- a/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml +++ b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml @@ -43,55 +43,53 @@ pipelines: - {name: "Alice", age: 30, score: 95.5} - {name: "Bob", age: 25, score: 88.0} - # TODO - Rethink error handling since schema mismatch in Create transform - # isn't allowed anymore. # Validate a Beam Row with a predefined schema with error handling - # - pipeline: - # type: composite - # transforms: - # - type: Create - # config: - # elements: - # - {name: "Alice", age: 30, score: 95.5} - # - {name: "Bob", age: 25, score: 88.0} - # - {name: "Charlie", age: 27, score: "apple"} - # - {name: "David", age: "twenty", score: 90.0} - # - {name: 30, age: 40, score: 100.0} - # - type: ValidateWithSchema - # input: Create - # config: - # schema: - # type: object - # properties: - # name: - # type: string - # age: - # type: integer - # score: - # type: number - # required: [name, age, score] - # error_handling: - # output: invalid_rows - # - type: MapToFields - # input: ValidateWithSchema.invalid_rows - # config: - # language: python - # fields: - # name: "element.name" - # age: "element.age" - # score: "element.score" - # - type: AssertEqual - # input: MapToFields - # config: - # elements: - # - {name: "Charlie", age: 27, score: "apple"} - # - {name: "David", age: "twenty", score: 90.0} - # - {name: 30, age: 40, score: 100.0} - # - type: AssertEqual - # input: ValidateWithSchema - # config: - # elements: - # - {name: "Alice", age: 30, score: 95.5} - # - {name: "Bob", age: 25, score: 88.0} + - pipeline: + type: composite + transforms: + - type: Create + config: + elements: + - {name: "Alice", age: 30, score: 95.5} + - {name: "Bob", age: 25, score: 88.0} + - {name: "Charlie", age: 27, score: "apple"} + - {name: "David", age: "twenty", score: 90.0} + - {name: 30, age: 40, score: 100.0} + - type: ValidateWithSchema + input: Create + config: + schema: + type: object + properties: + name: + type: string + age: + type: integer + score: + type: number + required: [name, age, score] + error_handling: + output: invalid_rows + - type: MapToFields + input: ValidateWithSchema.invalid_rows + config: + language: python + fields: + name: "element.name" + age: "element.age" + score: "element.score" + - type: AssertEqual + input: MapToFields + config: + elements: + - {name: "Charlie", age: 27, score: "apple"} + - {name: "David", age: "twenty", score: 90.0} + - {name: 30, age: 40, score: 100.0} + - type: AssertEqual + input: ValidateWithSchema + config: + elements: + - {name: "Alice", age: 30, score: 95.5} + - {name: "Bob", age: 25, score: 88.0} From ede7cece702b20ed44d5af3c43ea9662c511d05c Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 1 Aug 2025 14:40:34 +0000 Subject: [PATCH 14/15] switch to typehints --- sdks/python/apache_beam/transforms/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index bf020883a9da..1de1506159ef 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3987,11 +3987,11 @@ def infer_output_type(self, unused_input_type): non_none_types = {t for t in field_types if t is not type(None)} if len(non_none_types) > 1: - final_type = typing.Union[tuple(non_none_types)] + final_type = typehints.Union[tuple(non_none_types)] elif len(non_none_types) == 1 and len(field_types) == 1: final_type = non_none_types.pop() elif len(non_none_types) == 1 and len(field_types) == 2: - final_type = typing.Optional[non_none_types.pop()] + final_type = typehints.Optional[non_none_types.pop()] else: raise TypeError("No types found for field %s", field) From 64b99ac9987ce13cda495f61abd9322ab8bcf146 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 1 Aug 2025 16:24:47 +0000 Subject: [PATCH 15/15] updated core tests with typehints --- sdks/python/apache_beam/transforms/core_test.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 7991c9c25dbc..57fa21517349 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -21,7 +21,6 @@ import logging import os import tempfile -import typing import unittest from typing import TypeVar @@ -330,7 +329,9 @@ def test_multiple_types_for_field(self): beam.Row(a='foo')]).infer_output_type(None) self.assertEqual( output_type, - row_type.RowTypeConstraint.from_fields([('a', typing.Union[int, str])])) + row_type.RowTypeConstraint.from_fields([ + ('a', typehints.Union[int, str]) + ])) def test_single_type_for_field(self): output_type = beam.Create([beam.Row(a=1), @@ -343,7 +344,8 @@ def test_optional_type_for_field(self): beam.Row(a=None)]).infer_output_type(None) self.assertEqual( output_type, - row_type.RowTypeConstraint.from_fields([('a', typing.Optional[int])])) + row_type.RowTypeConstraint.from_fields([('a', typehints.Optional[int]) + ])) def test_none_type_for_field_raises_error(self): with self.assertRaisesRegex(TypeError,