diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index db4a652cf97e..da7602b9cda4 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2352,11 +2352,15 @@ def expand(pcoll): else: return pcoll + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 input_count_view = pcoll | 'CountTotal' >> ( - MaybeWindow() | Map(lambda _: 1) + MaybeWindow() | "Map()" >> Map(lambda _: 1) | CombineGlobally(sum).as_singleton_view()) bad_count_pcoll = result[self._dead_letter_tag] | 'CountBad' >> ( - MaybeWindow() | Map(lambda _: 1) + MaybeWindow() | "Map()" >> Map(lambda _: 1) | CombineGlobally(sum).without_defaults()) def check_threshold(bad, total, threshold, window=DoFn.WindowParam): @@ -3538,9 +3542,14 @@ def default_label(self): def expand(self, pcoll): input_type = pcoll.element_type or typing.Any + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return ( pcoll - | Map(lambda x: (self._key_func()(x), x)).with_output_types( + | "Map()" >> + Map(lambda x: (self._key_func()(x), x)).with_output_types( typehints.Tuple[self._key_type_hint(input_type), input_type]) | GroupByKey()) @@ -3595,14 +3604,19 @@ def expand(self, pcoll): key_type_hint = self._grouping.force_tuple_keys(True)._key_type_hint( pcoll.element_type) + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return ( pcoll - | Map(lambda x: (key_func(x), value_func(x))).with_output_types( + | "Map()" >> + Map(lambda x: (key_func(x), value_func(x))).with_output_types( typehints.Tuple[key_type_hint, typing.Any]) | CombinePerKey( TupleCombineFn( *[combine_fn for _, combine_fn, __ in self._aggregations])) - | MapTuple( + | "MapTuple()" >> MapTuple( lambda key, value: _dynamic_named_tuple('Result', result_fields) (*(key + value)))) @@ -3618,7 +3632,7 @@ class Select(PTransform): is the same as - pcoll | beam.Map(lambda x: beam.Row(a=x.a, b=foo(x))) + pcoll | 'label' >> beam.Map(lambda x: beam.Row(a=x.a, b=foo(x))) """ def __init__( self, @@ -3640,8 +3654,13 @@ def default_label(self): return 'ToRows(%s)' % ', '.join(name for name, _ in self._fields) def expand(self, pcoll): + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return ( - _MaybePValueWithErrors(pcoll, self._exception_handling_args) | Map( + _MaybePValueWithErrors(pcoll, self._exception_handling_args) + | "Map()" >> Map( lambda x: pvalue.Row( **{ name: expr(x) @@ -4128,10 +4147,15 @@ def expand(self, pcoll): else: return pcoll + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 return ( pbegin | Impulse() - | FlatMap(lambda _: serialized_values).with_output_types(bytes) + | "FlatMap()" >> + FlatMap(lambda _: serialized_values).with_output_types(bytes) | MaybeReshuffle().with_output_types(bytes) | Map(self._coder.decode).with_output_types(self.get_output_type())) diff --git a/sdks/python/apache_beam/transforms/external_java.py b/sdks/python/apache_beam/transforms/external_java.py index ebd760f70f7e..aa86127bd9f8 100644 --- a/sdks/python/apache_beam/transforms/external_java.py +++ b/sdks/python/apache_beam/transforms/external_java.py @@ -145,7 +145,12 @@ def run_pipeline(pipeline_options, expansion_service, wait_until_finish=True): ImplicitSchemaPayloadBuilder({'data': 'middle'}), expansion_service) | beam.ExternalTransform(TEST_COUNT_URN, None, expansion_service) - | beam.Map(lambda kv: '%s: %s' % kv)) + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 + | "Map()" >> + beam.Map(lambda kv: '%s: %s' % kv)) assert_that(res, equal_to(['a: 3', 'b: 1', 'c: 2'])) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 79421ff957b4..5af9d904895a 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -1441,13 +1441,18 @@ def WithKeys(pcoll, k, *args, **kwargs): if all(isinstance(arg, AsSideInput) for arg in args) and all(isinstance(kwarg, AsSideInput) for kwarg in kwargs.values()): - return pcoll | Map( + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 + return pcoll | "Map()" >> Map( lambda v, *args, **kwargs: (k(v, *args, **kwargs), v), *args, **kwargs) - return pcoll | Map(lambda v: (k(v, *args, **kwargs), v)) - return pcoll | Map(lambda v: (k(v), v)) - return pcoll | Map(lambda v: (k, v)) + return pcoll | "Map()" >> Map( + lambda v: (k(v, *args, **kwargs), v)) + return pcoll | "Map()" >> Map(lambda v: (k(v), v)) + return pcoll | "Map()" >> Map(lambda v: (k, v)) @typehints.with_input_types(tuple[K, V]) @@ -1527,7 +1532,11 @@ def __init__( def expand(self, pcoll): key_type, value_type = pcoll.element_type.tuple_types - sharded_pcoll = pcoll | Map( + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 + sharded_pcoll = pcoll | "Map()" >> Map( lambda key_value: ( ShardedKey( key_value[0], @@ -2032,7 +2041,12 @@ def replace_all(pcoll, regex, replacement): replacement: the string to be substituted for each match. """ regex = Regex._regex_compile(regex) - return pcoll | Map(lambda elem: regex.sub(replacement, elem)) + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 + return pcoll | "Map()" >> Map( + lambda elem: regex.sub(replacement, elem)) @staticmethod @typehints.with_input_types(str) @@ -2048,7 +2062,12 @@ def replace_first(pcoll, regex, replacement): replacement: the string to be substituted for each match. """ regex = Regex._regex_compile(regex) - return pcoll | Map(lambda elem: regex.sub(replacement, elem, 1)) + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 + return pcoll | "Map()" >> Map( + lambda elem: regex.sub(replacement, elem, 1)) @staticmethod @typehints.with_input_types(str) @@ -2139,4 +2158,9 @@ def expand(self, pcoll): | f"WaitOn{ix}" >> (beam.FlatMap(lambda x: ()) | GroupByKey())) for (ix, side) in enumerate(self._to_be_waited_on) ] - return pcoll | beam.Map(lambda x, *unused_sides: x, *sides) + # Map(lambda) produces a label formatted like this, but it cannot be + # changed without breaking update compat. Here, we pin to the transform + # name used in the 2.68 release to avoid breaking changes when the line + # number changes. Context: https://github.com/apache/beam/pull/36381 + return pcoll | "Map()" >> beam.Map( + lambda x, *unused_sides: x, *sides)