Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from apache_beam.yaml.yaml_transform import ensure_errors_consumed
from apache_beam.yaml.yaml_transform import ensure_transforms_have_types
from apache_beam.yaml.yaml_transform import expand_composite_transform
from apache_beam.yaml.yaml_transform import expand_pipeline
from apache_beam.yaml.yaml_transform import extract_name
from apache_beam.yaml.yaml_transform import identify_object
from apache_beam.yaml.yaml_transform import normalize_inputs_outputs
Expand Down Expand Up @@ -988,6 +989,69 @@ def test_init_with_dict(self):
self.assertEqual(result._spec['type'], "composite") # preprocessed spec


class ExpandPipelineTest(unittest.TestCase):
def test_expand_pipeline_with_pipeline_key_only(self):
spec = '''
pipeline:
type: chain
transforms:
- type: Create
config:
elements: [1,2,3]
- type: LogForTesting
'''
with new_pipeline() as p:
expand_pipeline(p, spec, validate_schema=None)

def test_expand_pipeline_with_pipeline_and_option_keys(self):
spec = '''
pipeline:
type: chain
transforms:
- type: Create
config:
elements: [1,2,3]
- type: LogForTesting
options:
streaming: false
'''
with new_pipeline() as p:
expand_pipeline(p, spec, validate_schema=None)

def test_expand_pipeline_with_extra_top_level_keys(self):
spec = '''
template:
version: "1.0"
author: "test_user"

pipeline:
type: chain
transforms:
- type: Create
config:
elements: [1,2,3]
- type: LogForTesting

other_metadata: "This is an ignored comment."
'''
with new_pipeline() as p:
expand_pipeline(p, spec, validate_schema=None)

def test_expand_pipeline_with_incorrect_pipelines_key_fails(self):
spec = '''
pipelines:
type: chain
transforms:
- type: Create
config:
elements: [1,2,3]
- type: LogForTesting
'''
with new_pipeline() as p:
with self.assertRaises(KeyError):
expand_pipeline(p, spec, validate_schema=None)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Loading