From f5fc7785b79fcfeab8f25be695c217aa60fca28b Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 8 Sep 2025 14:45:39 +0000 Subject: [PATCH 1/3] rebase --- .../yaml/yaml_transform_unit_test.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py index 0ea95228aeee..e418b1250810 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py @@ -33,6 +33,7 @@ from apache_beam.yaml.yaml_transform import normalize_inputs_outputs from apache_beam.yaml.yaml_transform import normalize_source_sink from apache_beam.yaml.yaml_transform import only_element +from apache_beam.yaml.yaml_transform import expand_pipeline from apache_beam.yaml.yaml_transform import pipeline_as_composite from apache_beam.yaml.yaml_transform import preprocess from apache_beam.yaml.yaml_transform import preprocess_flattened_inputs @@ -988,6 +989,27 @@ def test_init_with_dict(self): self.assertEqual(result._spec['type'], "composite") # preprocessed spec +class ExpandPipelineTest(unittest.TestCase): + def test_expand_pipeline_with_extra_top_level_keys(self): + spec = ''' + template: + version: "1.0" + author: "test_user" + + pipeline: + type: chain + transforms: + - type: Create + config: + elements: [1,2,3] + - type: LogForTesting + + other_metadata: "This is an ignored comment." + ''' + with new_pipeline() as p: + expand_pipeline(p, spec, validate_schema=None) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 2d6b84d1ad7b0bccefd99daca19805de18852155 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 8 Sep 2025 15:00:48 +0000 Subject: [PATCH 2/3] add more tests --- .../yaml/yaml_transform_unit_test.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py index e418b1250810..21f4f5dd7d4c 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py @@ -990,6 +990,34 @@ def test_init_with_dict(self): class ExpandPipelineTest(unittest.TestCase): + def test_expand_pipeline_with_pipeline_key_only(self): + spec = ''' + pipeline: + type: chain + transforms: + - type: Create + config: + elements: [1,2,3] + - type: LogForTesting + ''' + with new_pipeline() as p: + expand_pipeline(p, spec, validate_schema=None) + + def test_expand_pipeline_with_pipeline_and_option_keys(self): + spec = ''' + pipeline: + type: chain + transforms: + - type: Create + config: + elements: [1,2,3] + - type: LogForTesting + options: + streaming: false + ''' + with new_pipeline() as p: + expand_pipeline(p, spec, validate_schema=None) + def test_expand_pipeline_with_extra_top_level_keys(self): spec = ''' template: @@ -1009,6 +1037,20 @@ def test_expand_pipeline_with_extra_top_level_keys(self): with new_pipeline() as p: expand_pipeline(p, spec, validate_schema=None) + def test_expand_pipeline_with_incorrect_pipelines_key_fails(self): + spec = ''' + pipelines: + type: chain + transforms: + - type: Create + config: + elements: [1,2,3] + - type: LogForTesting + ''' + with new_pipeline() as p: + with self.assertRaises(KeyError): + expand_pipeline(p, spec, validate_schema=None) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From f02170696d491ec60d777fa77106c93588cd21f8 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 8 Sep 2025 16:39:45 +0000 Subject: [PATCH 3/3] fix lint issue --- sdks/python/apache_beam/yaml/yaml_transform_unit_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py index 21f4f5dd7d4c..eceac0b1857f 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py @@ -28,12 +28,12 @@ from apache_beam.yaml.yaml_transform import ensure_errors_consumed from apache_beam.yaml.yaml_transform import ensure_transforms_have_types from apache_beam.yaml.yaml_transform import expand_composite_transform +from apache_beam.yaml.yaml_transform import expand_pipeline from apache_beam.yaml.yaml_transform import extract_name from apache_beam.yaml.yaml_transform import identify_object from apache_beam.yaml.yaml_transform import normalize_inputs_outputs from apache_beam.yaml.yaml_transform import normalize_source_sink from apache_beam.yaml.yaml_transform import only_element -from apache_beam.yaml.yaml_transform import expand_pipeline from apache_beam.yaml.yaml_transform import pipeline_as_composite from apache_beam.yaml.yaml_transform import preprocess from apache_beam.yaml.yaml_transform import preprocess_flattened_inputs