Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2352,11 +2352,15 @@ def expand(pcoll):
else:
return pcoll

# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
input_count_view = pcoll | 'CountTotal' >> (
MaybeWindow() | Map(lambda _: 1)
MaybeWindow() | "Map(<lambda at core.py:2346>)" >> Map(lambda _: 1)
| CombineGlobally(sum).as_singleton_view())
bad_count_pcoll = result[self._dead_letter_tag] | 'CountBad' >> (
MaybeWindow() | Map(lambda _: 1)
MaybeWindow() | "Map(<lambda at core.py:2349>)" >> Map(lambda _: 1)
| CombineGlobally(sum).without_defaults())

def check_threshold(bad, total, threshold, window=DoFn.WindowParam):
Expand Down Expand Up @@ -3538,9 +3542,14 @@ def default_label(self):

def expand(self, pcoll):
input_type = pcoll.element_type or typing.Any
# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
return (
pcoll
| Map(lambda x: (self._key_func()(x), x)).with_output_types(
| "Map(<lambda at core.py:3503>)" >>
Map(lambda x: (self._key_func()(x), x)).with_output_types(
typehints.Tuple[self._key_type_hint(input_type), input_type])
| GroupByKey())

Expand Down Expand Up @@ -3595,14 +3604,19 @@ def expand(self, pcoll):
key_type_hint = self._grouping.force_tuple_keys(True)._key_type_hint(
pcoll.element_type)

# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
return (
pcoll
| Map(lambda x: (key_func(x), value_func(x))).with_output_types(
| "Map(<lambda at core.py:3560>)" >>
Map(lambda x: (key_func(x), value_func(x))).with_output_types(
typehints.Tuple[key_type_hint, typing.Any])
| CombinePerKey(
TupleCombineFn(
*[combine_fn for _, combine_fn, __ in self._aggregations]))
| MapTuple(
| "MapTuple(<lambda at core.py:3565>)" >> MapTuple(
lambda key, value: _dynamic_named_tuple('Result', result_fields)
(*(key + value))))

Expand All @@ -3618,7 +3632,7 @@ class Select(PTransform):

is the same as

pcoll | beam.Map(lambda x: beam.Row(a=x.a, b=foo(x)))
pcoll | 'label' >> beam.Map(lambda x: beam.Row(a=x.a, b=foo(x)))
"""
def __init__(
self,
Expand All @@ -3640,8 +3654,13 @@ def default_label(self):
return 'ToRows(%s)' % ', '.join(name for name, _ in self._fields)

def expand(self, pcoll):
# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
return (
_MaybePValueWithErrors(pcoll, self._exception_handling_args) | Map(
_MaybePValueWithErrors(pcoll, self._exception_handling_args)
| "Map(<lambda at core.py:3605>)" >> Map(
lambda x: pvalue.Row(
**{
name: expr(x)
Expand Down Expand Up @@ -4128,10 +4147,15 @@ def expand(self, pcoll):
else:
return pcoll

# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
return (
pbegin
| Impulse()
| FlatMap(lambda _: serialized_values).with_output_types(bytes)
| "FlatMap(<lambda at core.py:4094>)" >>
FlatMap(lambda _: serialized_values).with_output_types(bytes)
| MaybeReshuffle().with_output_types(bytes)
| Map(self._coder.decode).with_output_types(self.get_output_type()))

Expand Down
7 changes: 6 additions & 1 deletion sdks/python/apache_beam/transforms/external_java.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ def run_pipeline(pipeline_options, expansion_service, wait_until_finish=True):
ImplicitSchemaPayloadBuilder({'data': 'middle'}),
expansion_service)
| beam.ExternalTransform(TEST_COUNT_URN, None, expansion_service)
| beam.Map(lambda kv: '%s: %s' % kv))
# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
| "Map(<lambda at external_java.py:148>)" >>
beam.Map(lambda kv: '%s: %s' % kv))

assert_that(res, equal_to(['a: 3', 'b: 1', 'c: 2']))

Expand Down
40 changes: 32 additions & 8 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,13 +1441,18 @@ def WithKeys(pcoll, k, *args, **kwargs):
if all(isinstance(arg, AsSideInput)
for arg in args) and all(isinstance(kwarg, AsSideInput)
for kwarg in kwargs.values()):
return pcoll | Map(
# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
return pcoll | "Map(<lambda at util.py:1189>)" >> Map(
lambda v, *args, **kwargs: (k(v, *args, **kwargs), v),
*args,
**kwargs)
return pcoll | Map(lambda v: (k(v, *args, **kwargs), v))
return pcoll | Map(lambda v: (k(v), v))
return pcoll | Map(lambda v: (k, v))
return pcoll | "Map(<lambda at util.py:1192>)" >> Map(
lambda v: (k(v, *args, **kwargs), v))
return pcoll | "Map(<lambda at util.py:1193>)" >> Map(lambda v: (k(v), v))
return pcoll | "Map(<lambda at util.py:1194>)" >> Map(lambda v: (k, v))


@typehints.with_input_types(tuple[K, V])
Expand Down Expand Up @@ -1527,7 +1532,11 @@ def __init__(

def expand(self, pcoll):
key_type, value_type = pcoll.element_type.tuple_types
sharded_pcoll = pcoll | Map(
# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
sharded_pcoll = pcoll | "Map(<lambda at util.py:1275>)" >> Map(
lambda key_value: (
ShardedKey(
key_value[0],
Expand Down Expand Up @@ -2032,7 +2041,12 @@ def replace_all(pcoll, regex, replacement):
replacement: the string to be substituted for each match.
"""
regex = Regex._regex_compile(regex)
return pcoll | Map(lambda elem: regex.sub(replacement, elem))
# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
return pcoll | "Map(<lambda at util.py:1779>)" >> Map(
lambda elem: regex.sub(replacement, elem))

@staticmethod
@typehints.with_input_types(str)
Expand All @@ -2048,7 +2062,12 @@ def replace_first(pcoll, regex, replacement):
replacement: the string to be substituted for each match.
"""
regex = Regex._regex_compile(regex)
return pcoll | Map(lambda elem: regex.sub(replacement, elem, 1))
# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
return pcoll | "Map(<lambda at util.py:1795>)" >> Map(
lambda elem: regex.sub(replacement, elem, 1))

@staticmethod
@typehints.with_input_types(str)
Expand Down Expand Up @@ -2139,4 +2158,9 @@ def expand(self, pcoll):
| f"WaitOn{ix}" >> (beam.FlatMap(lambda x: ()) | GroupByKey()))
for (ix, side) in enumerate(self._to_be_waited_on)
]
return pcoll | beam.Map(lambda x, *unused_sides: x, *sides)
# Map(lambda) produces a label formatted like this, but it cannot be
# changed without breaking update compat. Here, we pin to the transform
# name used in the 2.68 release to avoid breaking changes when the line
# number changes. Context: https://github.com/apache/beam/pull/36381
return pcoll | "Map(<lambda at util.py:1886>)" >> beam.Map(
lambda x, *unused_sides: x, *sides)
Loading