From 729a8d5d5b6016fb0f7e26d0a745d8cbe5ef2906 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 12:04:17 -0400 Subject: [PATCH 1/5] Add labels to unlabeled transforms --- sdks/python/apache_beam/transforms/core.py | 15 +++++++++------ .../apache_beam/transforms/external_java.py | 3 ++- sdks/python/apache_beam/transforms/util.py | 19 +++++++++++-------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index cbd78d8222e8..4fb764c83ed4 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2352,10 +2352,10 @@ def expand(pcoll): return pcoll input_count_view = pcoll | 'CountTotal' >> ( - MaybeWindow() | Map(lambda _: 1) + MaybeWindow() | "Map()" >> Map(lambda _: 1) | CombineGlobally(sum).as_singleton_view()) bad_count_pcoll = result[self._dead_letter_tag] | 'CountBad' >> ( - MaybeWindow() | Map(lambda _: 1) + MaybeWindow() | "Map()" >> Map(lambda _: 1) | CombineGlobally(sum).without_defaults()) def check_threshold(bad, total, threshold, window=DoFn.WindowParam): @@ -3521,7 +3521,8 @@ def expand(self, pcoll): input_type = pcoll.element_type or typing.Any return ( pcoll - | Map(lambda x: (self._key_func()(x), x)).with_output_types( + | "Map()" >> + Map(lambda x: (self._key_func()(x), x)).with_output_types( typehints.Tuple[self._key_type_hint(input_type), input_type]) | GroupByKey()) @@ -3578,7 +3579,8 @@ def expand(self, pcoll): return ( pcoll - | Map(lambda x: (key_func(x), value_func(x))).with_output_types( + | "Map()" >> + Map(lambda x: (key_func(x), value_func(x))).with_output_types( typehints.Tuple[key_type_hint, typing.Any]) | CombinePerKey( TupleCombineFn( @@ -3599,7 +3601,7 @@ class Select(PTransform): is the same as - pcoll | beam.Map(lambda x: beam.Row(a=x.a, b=foo(x))) + pcoll | 'label' >> beam.Map(lambda x: beam.Row(a=x.a, b=foo(x))) """ def __init__( self, @@ -3622,7 +3624,8 @@ def default_label(self): def expand(self, pcoll): return ( - _MaybePValueWithErrors(pcoll, self._exception_handling_args) | Map( + _MaybePValueWithErrors(pcoll, self._exception_handling_args) + | "Map()" >> Map( lambda x: pvalue.Row( **{ name: expr(x) diff --git a/sdks/python/apache_beam/transforms/external_java.py b/sdks/python/apache_beam/transforms/external_java.py index ebd760f70f7e..140ebe8f8565 100644 --- a/sdks/python/apache_beam/transforms/external_java.py +++ b/sdks/python/apache_beam/transforms/external_java.py @@ -145,7 +145,8 @@ def run_pipeline(pipeline_options, expansion_service, wait_until_finish=True): ImplicitSchemaPayloadBuilder({'data': 'middle'}), expansion_service) | beam.ExternalTransform(TEST_COUNT_URN, None, expansion_service) - | beam.Map(lambda kv: '%s: %s' % kv)) + | "Map()" >> beam.Map( + lambda kv: '%s: %s' % kv)) assert_that(res, equal_to(['a: 3', 'b: 1', 'c: 2'])) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index c63478dc0cfc..bbc546b32cdc 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -1393,13 +1393,14 @@ def WithKeys(pcoll, k, *args, **kwargs): if all(isinstance(arg, AsSideInput) for arg in args) and all(isinstance(kwarg, AsSideInput) for kwarg in kwargs.values()): - return pcoll | Map( + return pcoll | "Map()" >> Map( lambda v, *args, **kwargs: (k(v, *args, **kwargs), v), *args, **kwargs) - return pcoll | Map(lambda v: (k(v, *args, **kwargs), v)) - return pcoll | Map(lambda v: (k(v), v)) - return pcoll | Map(lambda v: (k, v)) + return pcoll | "Map()" >> Map( + lambda v: (k(v, *args, **kwargs), v)) + return pcoll | "Map()" >> Map(lambda v: (k(v), v)) + return pcoll | "Map()" >> Map(lambda v: (k, v)) @typehints.with_input_types(tuple[K, V]) @@ -1479,7 +1480,7 @@ def __init__( def expand(self, pcoll): key_type, value_type = pcoll.element_type.tuple_types - sharded_pcoll = pcoll | Map( + sharded_pcoll = pcoll | "Map()" >> Map( lambda key_value: ( ShardedKey( key_value[0], @@ -1984,7 +1985,8 @@ def replace_all(pcoll, regex, replacement): replacement: the string to be substituted for each match. """ regex = Regex._regex_compile(regex) - return pcoll | Map(lambda elem: regex.sub(replacement, elem)) + return pcoll | "Map()" >> Map( + lambda elem: regex.sub(replacement, elem)) @staticmethod @typehints.with_input_types(str) @@ -2000,7 +2002,8 @@ def replace_first(pcoll, regex, replacement): replacement: the string to be substituted for each match. """ regex = Regex._regex_compile(regex) - return pcoll | Map(lambda elem: regex.sub(replacement, elem, 1)) + return pcoll | "Map()" >> Map( + lambda elem: regex.sub(replacement, elem, 1)) @staticmethod @typehints.with_input_types(str) @@ -2091,4 +2094,4 @@ def expand(self, pcoll): | f"WaitOn{ix}" >> (beam.FlatMap(lambda x: ()) | GroupByKey())) for (ix, side) in enumerate(self._to_be_waited_on) ] - return pcoll | beam.Map(lambda x, *unused_sides: x, *sides) + return pcoll | "Map()" >> beam.Map(lambda x, *unused_sides: x, *sides) From 2b2d086dca0a85af6dbe98bd61ab891dca5ba966 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 13:18:38 -0400 Subject: [PATCH 2/5] yapf --- sdks/python/apache_beam/transforms/external_java.py | 4 ++-- sdks/python/apache_beam/transforms/util.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/external_java.py b/sdks/python/apache_beam/transforms/external_java.py index 140ebe8f8565..a7a422caa285 100644 --- a/sdks/python/apache_beam/transforms/external_java.py +++ b/sdks/python/apache_beam/transforms/external_java.py @@ -145,8 +145,8 @@ def run_pipeline(pipeline_options, expansion_service, wait_until_finish=True): ImplicitSchemaPayloadBuilder({'data': 'middle'}), expansion_service) | beam.ExternalTransform(TEST_COUNT_URN, None, expansion_service) - | "Map()" >> beam.Map( - lambda kv: '%s: %s' % kv)) + | "Map()" >> + beam.Map(lambda kv: '%s: %s' % kv)) assert_that(res, equal_to(['a: 3', 'b: 1', 'c: 2'])) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index bbc546b32cdc..7f18b3e47af8 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -2094,4 +2094,5 @@ def expand(self, pcoll): | f"WaitOn{ix}" >> (beam.FlatMap(lambda x: ()) | GroupByKey())) for (ix, side) in enumerate(self._to_be_waited_on) ] - return pcoll | "Map()" >> beam.Map(lambda x, *unused_sides: x, *sides) + return pcoll | "Map()" >> beam.Map( + lambda x, *unused_sides: x, *sides) From 90e7cc37422983cf310ef35c8b207b5342ffb783 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 13:44:27 -0400 Subject: [PATCH 3/5] context + a few more --- sdks/python/apache_beam/transforms/core.py | 28 +++++++++++++++++-- .../apache_beam/transforms/external_java.py | 4 +++ sdks/python/apache_beam/transforms/util.py | 20 +++++++++++++ 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 4fb764c83ed4..a9b8e7f8a593 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2351,6 +2351,10 @@ def expand(pcoll): else: return pcoll + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 input_count_view = pcoll | 'CountTotal' >> ( MaybeWindow() | "Map()" >> Map(lambda _: 1) | CombineGlobally(sum).as_singleton_view()) @@ -3519,6 +3523,10 @@ def default_label(self): def expand(self, pcoll): input_type = pcoll.element_type or typing.Any + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return ( pcoll | "Map()" >> @@ -3577,6 +3585,10 @@ def expand(self, pcoll): key_type_hint = self._grouping.force_tuple_keys(True)._key_type_hint( pcoll.element_type) + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return ( pcoll | "Map()" >> @@ -3585,7 +3597,7 @@ def expand(self, pcoll): | CombinePerKey( TupleCombineFn( *[combine_fn for _, combine_fn, __ in self._aggregations])) - | MapTuple( + | "MapTuple()" >> MapTuple( lambda key, value: _dynamic_named_tuple('Result', result_fields) (*(key + value)))) @@ -3623,6 +3635,10 @@ def default_label(self): return 'ToRows(%s)' % ', '.join(name for name, _ in self._fields) def expand(self, pcoll): + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return ( _MaybePValueWithErrors(pcoll, self._exception_handling_args) | "Map()" >> Map( @@ -4112,12 +4128,18 @@ def expand(self, pcoll): else: return pcoll + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return ( pbegin | Impulse() - | FlatMap(lambda _: serialized_values).with_output_types(bytes) + | "FlatMap()" >> + FlatMap(lambda _: serialized_values).with_output_types(bytes) | MaybeReshuffle().with_output_types(bytes) - | Map(self._coder.decode).with_output_types(self.get_output_type())) + | "Map()" >> Map( + self._coder.decode).with_output_types(self.get_output_type())) def as_read(self): from apache_beam.io import iobase diff --git a/sdks/python/apache_beam/transforms/external_java.py b/sdks/python/apache_beam/transforms/external_java.py index a7a422caa285..aa86127bd9f8 100644 --- a/sdks/python/apache_beam/transforms/external_java.py +++ b/sdks/python/apache_beam/transforms/external_java.py @@ -145,6 +145,10 @@ def run_pipeline(pipeline_options, expansion_service, wait_until_finish=True): ImplicitSchemaPayloadBuilder({'data': 'middle'}), expansion_service) | beam.ExternalTransform(TEST_COUNT_URN, None, expansion_service) + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 | "Map()" >> beam.Map(lambda kv: '%s: %s' % kv)) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 7f18b3e47af8..008e2e7aaff6 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -1393,6 +1393,10 @@ def WithKeys(pcoll, k, *args, **kwargs): if all(isinstance(arg, AsSideInput) for arg in args) and all(isinstance(kwarg, AsSideInput) for kwarg in kwargs.values()): + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return pcoll | "Map()" >> Map( lambda v, *args, **kwargs: (k(v, *args, **kwargs), v), *args, @@ -1480,6 +1484,10 @@ def __init__( def expand(self, pcoll): key_type, value_type = pcoll.element_type.tuple_types + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 sharded_pcoll = pcoll | "Map()" >> Map( lambda key_value: ( ShardedKey( @@ -1985,6 +1993,10 @@ def replace_all(pcoll, regex, replacement): replacement: the string to be substituted for each match. """ regex = Regex._regex_compile(regex) + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return pcoll | "Map()" >> Map( lambda elem: regex.sub(replacement, elem)) @@ -2002,6 +2014,10 @@ def replace_first(pcoll, regex, replacement): replacement: the string to be substituted for each match. """ regex = Regex._regex_compile(regex) + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return pcoll | "Map()" >> Map( lambda elem: regex.sub(replacement, elem, 1)) @@ -2094,5 +2110,9 @@ def expand(self, pcoll): | f"WaitOn{ix}" >> (beam.FlatMap(lambda x: ()) | GroupByKey())) for (ix, side) in enumerate(self._to_be_waited_on) ] + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return pcoll | "Map()" >> beam.Map( lambda x, *unused_sides: x, *sides) From e9b1546ed0e6f26336aac44e91496314ab470b1e Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 13:49:59 -0400 Subject: [PATCH 4/5] correct a few lines --- sdks/python/apache_beam/transforms/core.py | 2 +- sdks/python/apache_beam/transforms/util.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a9b8e7f8a593..a2c395b4a683 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3641,7 +3641,7 @@ def expand(self, pcoll): # number changes. Context: https://github.com/apache/beam/pull/36381 return ( _MaybePValueWithErrors(pcoll, self._exception_handling_args) - | "Map()" >> Map( + | "Map()" >> Map( lambda x: pvalue.Row( **{ name: expr(x) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 008e2e7aaff6..7baef8d78294 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -1397,7 +1397,7 @@ def WithKeys(pcoll, k, *args, **kwargs): # changed without breaking update compat. Here, we pin to the transform # name used in the 2.68 release to avoid breaking changes when the line # number changes. Context: https://github.com/apache/beam/pull/36381 - return pcoll | "Map()" >> Map( + return pcoll | "Map()" >> Map( lambda v, *args, **kwargs: (k(v, *args, **kwargs), v), *args, **kwargs) @@ -1488,7 +1488,7 @@ def expand(self, pcoll): # changed without breaking update compat. Here, we pin to the transform # name used in the 2.68 release to avoid breaking changes when the line # number changes. Context: https://github.com/apache/beam/pull/36381 - sharded_pcoll = pcoll | "Map()" >> Map( + sharded_pcoll = pcoll | "Map()" >> Map( lambda key_value: ( ShardedKey( key_value[0], From ed5a52adc7089a25decc94413ddb81d49d4cf476 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 15:13:02 -0400 Subject: [PATCH 5/5] Undo bad edit --- sdks/python/apache_beam/transforms/core.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a2c395b4a683..c18ba16c389f 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -4138,8 +4138,7 @@ def expand(self, pcoll): | "FlatMap()" >> FlatMap(lambda _: serialized_values).with_output_types(bytes) | MaybeReshuffle().with_output_types(bytes) - | "Map()" >> Map( - self._coder.decode).with_output_types(self.get_output_type())) + | Map(self._coder.decode).with_output_types(self.get_output_type())) def as_read(self): from apache_beam.io import iobase