From 38eee8f48bfba6de78c809fb3399e67a9fd30e7e Mon Sep 17 00:00:00 2001 From: liferoad Date: Wed, 25 Jun 2025 20:12:40 -0400 Subject: [PATCH 1/6] fixed the beam yaml create issue by create the full dict --- .../python/apache_beam/yaml/tests/create.yaml | 19 ++++++++++- sdks/python/apache_beam/yaml/yaml_provider.py | 34 +++++++++++++++++-- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/yaml/tests/create.yaml b/sdks/python/apache_beam/yaml/tests/create.yaml index bed364c17143..723d8a888c26 100644 --- a/sdks/python/apache_beam/yaml/tests/create.yaml +++ b/sdks/python/apache_beam/yaml/tests/create.yaml @@ -30,7 +30,7 @@ pipelines: - {element: 2} - {element: 3} - {element: 4} - - {element: 5} + - {element: 5} # Simple Create with more complex beam row - pipeline: @@ -64,3 +64,20 @@ pipelines: - {first: 0, second: [1,2,3]} - {first: 1, second: [4,5,6]} - {first: 2, second: [7,8,9]} + + # Simple Create with element list + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: Flume} + - {sdk: MillWheel, year: 2008} + - type: AssertEqual + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: Flume} + - {sdk: MillWheel, year: 2008} diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 7c8114b57706..d79b923bf886 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -785,9 +785,17 @@ def __init__(self, elements: Iterable[Any]): self._elements = elements def expand(self, pcoll): + def to_dict(row): + if isinstance(row, beam.Row): + return {k: v for k, v in row._asdict().items() if v is not None} + elif hasattr(row, '_asdict'): + return {k: v for k, v in row._asdict().items() if v is not None} + else: + return row + return assert_that( - pcoll | beam.Map(lambda row: beam.Row(**row._asdict())), - equal_to(dicts_to_rows(self._elements))) + pcoll | beam.Map(to_dict), + equal_to([to_dict(e) for e in dicts_to_rows(self._elements)])) @staticmethod def create(elements: Iterable[Any], reshuffle: Optional[bool] = True): @@ -838,7 +846,27 @@ def create(elements: Iterable[Any], reshuffle: Optional[bool] = True): # not the intent. if not isinstance(elements, Iterable) or isinstance(elements, (dict, str)): raise TypeError('elements must be a list of elements') - return beam.Create([element_to_rows(e) for e in elements], + + # Merge all dictionaries to get all possible keys + all_keys = set() + for element in elements: + if isinstance(element, dict): + all_keys.update(element.keys()) + + # Create a merged dictionary with all keys + merged_dict = {} + for key in all_keys: + merged_dict[key] = None # Use None as a default value + + # Update each element with the merged dictionary + updated_elements = [] + for e in elements: + if isinstance(e, dict): + updated_elements.append({**merged_dict, **e}) + else: + updated_elements.append(e) + + return beam.Create([element_to_rows(e) for e in updated_elements], reshuffle=reshuffle is not False) # Or should this be posargs, args? From a25a82425a05925cd30977b34cbf831ae6a16c43 Mon Sep 17 00:00:00 2001 From: liferoad Date: Wed, 25 Jun 2025 22:19:02 -0400 Subject: [PATCH 2/6] sorted the elements --- sdks/python/apache_beam/yaml/yaml_provider.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index d79b923bf886..a531cc68a30c 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -787,9 +787,9 @@ def __init__(self, elements: Iterable[Any]): def expand(self, pcoll): def to_dict(row): if isinstance(row, beam.Row): - return {k: v for k, v in row._asdict().items() if v is not None} + return dict(sorted({k: v for k, v in row._asdict().items() if v is not None}.items())) elif hasattr(row, '_asdict'): - return {k: v for k, v in row._asdict().items() if v is not None} + return dict(sorted({k: v for k, v in row._asdict().items() if v is not None}.items())) else: return row From d4b620326912efaa7b09d2b85c8f1b51ba89259b Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 26 Jun 2025 07:44:10 -0400 Subject: [PATCH 3/6] fixed assert --- sdks/python/apache_beam/yaml/yaml_provider.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index a531cc68a30c..c2a9c95c7fde 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -786,12 +786,8 @@ def __init__(self, elements: Iterable[Any]): def expand(self, pcoll): def to_dict(row): - if isinstance(row, beam.Row): - return dict(sorted({k: v for k, v in row._asdict().items() if v is not None}.items())) - elif hasattr(row, '_asdict'): - return dict(sorted({k: v for k, v in row._asdict().items() if v is not None}.items())) - else: - return row + # filter None when comparing + return {k: v for k, v in row._asdict().items() if v is not None}.items() return assert_that( pcoll | beam.Map(to_dict), From 1caad9a65aa676c3a7eec7154394af2158fd49e0 Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 26 Jun 2025 09:26:42 -0400 Subject: [PATCH 4/6] optimized elements --- sdks/python/apache_beam/yaml/yaml_provider.py | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index c2a9c95c7fde..e934d79def3c 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -843,24 +843,29 @@ def create(elements: Iterable[Any], reshuffle: Optional[bool] = True): if not isinstance(elements, Iterable) or isinstance(elements, (dict, str)): raise TypeError('elements must be a list of elements') - # Merge all dictionaries to get all possible keys - all_keys = set() - for element in elements: - if isinstance(element, dict): - all_keys.update(element.keys()) - - # Create a merged dictionary with all keys - merged_dict = {} - for key in all_keys: - merged_dict[key] = None # Use None as a default value - - # Update each element with the merged dictionary - updated_elements = [] - for e in elements: - if isinstance(e, dict): - updated_elements.append({**merged_dict, **e}) - else: - updated_elements.append(e) + # Check if elements have different keys + updated_elements = elements + if elements and all(isinstance(e, dict) for e in elements): + keys = [set(e.keys()) for e in elements] + if len(set.union(*keys)) > min(len(k) for k in keys): + # Merge all dictionaries to get all possible keys + all_keys = set() + for element in elements: + if isinstance(element, dict): + all_keys.update(element.keys()) + + # Create a merged dictionary with all keys + merged_dict = {} + for key in all_keys: + merged_dict[key] = None # Use None as a default value + + # Update each element with the merged dictionary + updated_elements = [] + for e in elements: + if isinstance(e, dict): + updated_elements.append({**merged_dict, **e}) + else: + updated_elements.append(e) return beam.Create([element_to_rows(e) for e in updated_elements], reshuffle=reshuffle is not False) From 2f56620588b0edaa472964b801bd01a3e679d140 Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 26 Jun 2025 09:32:27 -0400 Subject: [PATCH 5/6] fixed AssertEqual --- sdks/python/apache_beam/yaml/yaml_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index e934d79def3c..ba6e988e719b 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -787,7 +787,7 @@ def __init__(self, elements: Iterable[Any]): def expand(self, pcoll): def to_dict(row): # filter None when comparing - return {k: v for k, v in row._asdict().items() if v is not None}.items() + return dict({k: v for k, v in row._asdict().items() if v is not None}.items()) return assert_that( pcoll | beam.Map(to_dict), From 4f5b95a2eb41736db4bbd8ee21d86f2771a2ca82 Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 26 Jun 2025 15:00:41 -0400 Subject: [PATCH 6/6] fixed the lint --- sdks/python/apache_beam/yaml/yaml_provider.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index ba6e988e719b..42087565013c 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -787,7 +787,8 @@ def __init__(self, elements: Iterable[Any]): def expand(self, pcoll): def to_dict(row): # filter None when comparing - return dict({k: v for k, v in row._asdict().items() if v is not None}.items()) + temp_dict = {k: v for k, v in row._asdict().items() if v is not None} + return dict(temp_dict.items()) return assert_that( pcoll | beam.Map(to_dict),