From f777002c1fe75fa09f64be0784adc46aa0d25443 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 19 Aug 2025 10:19:13 -0400 Subject: [PATCH 01/15] implement lambda name pickling in cloudpickle --- .../internal/cloudpickle/cloudpickle.py | 20 +++++++++++-- .../internal/cloudpickle_pickler.py | 4 ++- .../internal/code_object_pickler.py | 29 ++++++++++++++----- .../apache_beam/internal/dill_pickler.py | 7 ++++- sdks/python/apache_beam/internal/pickler.py | 6 ++-- .../apache_beam/internal/pickler_test.py | 16 ++++++++++ .../apache_beam/runners/pipeline_context.py | 1 + .../apache_beam/transforms/ptransform.py | 2 ++ 8 files changed, 70 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index 5a9d89430fd3..46d2cf01f6ae 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -526,6 +526,11 @@ def _make_function(code, globals, name, argdefs, closure): return types.FunctionType(code, globals, name, argdefs, closure) +def _make_function_from_identifier(code_path, globals, name, argdefs, closure): + fcode = get_code_from_identifier(code_path) + return _make_function(fcode, globals, name, argdefs, closure) + + def _make_empty_cell(): if False: # trick the compiler into creating an empty cell in our lambda @@ -1266,7 +1271,11 @@ def _dynamic_function_reduce(self, func): """Reduce a function that is not pickleable via attribute lookup.""" newargs = self._function_getnewargs(func) state = _function_getstate(func) - return (_make_function, newargs, state, None, None, _function_setstate) + if type(newargs[0]) == str: + make_function - _make_function_from_identifier + else: + make_function = _make_function + return (make_function, newargs, state, None, None, _function_setstate) def _function_reduce(self, obj): """Reducer for function objects. @@ -1283,6 +1292,7 @@ def _function_reduce(self, obj): return self._dynamic_function_reduce(obj) def _function_getnewargs(self, func): + code_path = get_code_object_indentifier(func) if self.enable_lambda_name else None code = func.__code__ # base_globals represents the future global namespace of func at @@ -1313,7 +1323,10 @@ def _function_getnewargs(self, func): else: closure = tuple(_make_empty_cell() for _ in range(len(code.co_freevars))) - return code, base_globals, None, None, closure + if code_path: + return code_path, base_globals, None, None, closure + else: + return code, base_globals, None, None, closure def dump(self, obj): try: @@ -1326,7 +1339,8 @@ def dump(self, obj): raise def __init__( - self, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG): + self, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG, + enable_lambda_name=False): if protocol is None: protocol = DEFAULT_PROTOCOL super().__init__(file, protocol=protocol, buffer_callback=buffer_callback) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 63038e770f27..757d6a19682c 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -117,6 +117,7 @@ def dumps( enable_trace=True, use_zlib=False, enable_best_effort_determinism=False, + enable_lambda_name=False, config: cloudpickle.CloudPickleConfig = DEFAULT_CONFIG) -> bytes: """For internal use only; no backwards-compatibility guarantees.""" if enable_best_effort_determinism: @@ -127,7 +128,8 @@ def dumps( 'This has only been implemented for dill.') with _pickle_lock: with io.BytesIO() as file: - pickler = cloudpickle.CloudPickler(file, config=config) + pickler = cloudpickle.CloudPickler( + file, config=config, enable_lambda_name=enable_lambda_name) try: pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index b6ea015cc06f..0d0f4a3005fe 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -315,10 +315,10 @@ def _search_lambda( _SINGLE_NAME_PATTERN = re.compile(r'co_consts\[([a-zA-Z0-9\<\>_-]+)]') # Matches a path like: co_consts[, ('x',)] _LAMBDA_WITH_ARGS_PATTERN = re.compile( - r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\))\]") + r"co_consts\[(<.*?>),\s(\('[^']+'(?:,\s*'[^']+')*,?\))\]") # Matches a path like: co_consts[, ('x',), 1234567890] _LAMBDA_WITH_HASH_PATTERN = re.compile( - r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\)),\s*(.+)\]") + r"co_consts\[(<[^>]+>),\s*(\([^\)]*\)),?\s*(.*)\]") # Matches a path like: __defaults__[0] _DEFAULT_PATTERN = re.compile(r'(__defaults__)\[(\d+)\]') # Matches an argument like: 'x' @@ -375,7 +375,7 @@ def _get_code_object_from_lambda_with_args_pattern( for obj_ in objects: args = tuple( re.findall(_ARGUMENT_PATTERN, lambda_with_args_result.group(2))) - if obj_.co_varnames == args: + if obj_.co_varnames[:_get_arg_count(obj_)] == args: return obj_ raise AttributeError(f'Could not find code object with path: {path}') @@ -404,7 +404,7 @@ def _get_code_object_from_lambda_with_hash_pattern( for obj_ in objects: args = tuple( re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2))) - if obj_.co_varnames == args: + if obj_.co_varnames[:_get_arg_count(obj_)] == args: hash_value = lambda_with_hash_result.group(3) if hash_value == str(_create_bytecode_hash(obj_)): return obj_ @@ -462,12 +462,25 @@ def _signature(obj: types.CodeType): Returns: A tuple of the names of the arguments of the code object. """ - arg_count = ( - obj.co_argcount + obj.co_kwonlyargcount + - (obj.co_flags & 4 == 4) # PyCF_VARARGS + return obj.co_varnames[:_get_arg_count(obj)] + + +def _get_arg_count(obj: types.CodeType): + """Returns the number of arguments of a code object. + + Args: + obj: A code object, function, method, or cell. + + Returns: + The number of arguments of the code object, or None if the object is not a + code object. + """ + return ( + obj.co_argcount + + obj.co_kwonlyargcount + + (obj.co_flags & 4 == 4) # PyCF_VARARGS + (obj.co_flags & 8 == 8) # PyCF_VARKEYWORDS ) - return obj.co_varnames[:arg_count] def _create_bytecode_hash(code_object: types.CodeType): diff --git a/sdks/python/apache_beam/internal/dill_pickler.py b/sdks/python/apache_beam/internal/dill_pickler.py index 9a3d43826610..bbfc50546640 100644 --- a/sdks/python/apache_beam/internal/dill_pickler.py +++ b/sdks/python/apache_beam/internal/dill_pickler.py @@ -379,8 +379,13 @@ def dumps( o, enable_trace=True, use_zlib=False, - enable_best_effort_determinism=False) -> bytes: + enable_best_effort_determinism=False, + enable_lambda_name=False) -> bytes: """For internal use only; no backwards-compatibility guarantees.""" + if enable_lambda_name: + logging.info( + 'Ignoring unsupported option: enable_lambda_name. ' + 'This has only been implemented for CloudPickle.') with _pickle_lock: if enable_best_effort_determinism: old_save_set = dill.dill.Pickler.dispatch[set] diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 256f88c5453f..d43659df1b54 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -42,13 +42,15 @@ def dumps( o, enable_trace=True, use_zlib=False, - enable_best_effort_determinism=False) -> bytes: + enable_best_effort_determinism=False, + enable_lambda_name=False) -> bytes: return desired_pickle_lib.dumps( o, enable_trace=enable_trace, use_zlib=use_zlib, - enable_best_effort_determinism=enable_best_effort_determinism) + enable_best_effort_determinism=enable_best_effort_determinism, + enable_lambda_name=enable_lambda_name) def loads(encoded, enable_trace=True, use_zlib=False): diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 7048f680de87..1f4d15696338 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -34,6 +34,10 @@ from apache_beam.internal.pickler import loads +def pickle_depickle(obj, enable_lambda_name): + return loads(dumps(obj, enable_lambda_name=enable_lambda_name)) + + class PicklerTest(unittest.TestCase): NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") @@ -279,5 +283,17 @@ def test_disable_best_effort_determinism(self): dumps(set2, enable_best_effort_determinism=False)) + def test_enable_lambda_name_pickling(self): + pickler.set_library('cloudpickle') + pickled = pickle_depickle(lambda x: x, enable_lambda_name=True) + pickled_type = type(pickled) + self.assertIsInstance(pickled, pickled_type) + + def test_disable_lambda_name_pickling(self): + pickler.set_library('cloudpickle') + pickled = pickle_depickle(lambda x: x, enable_lambda_name=False) + pickled_type = type(pickled) + self.assertIsInstance(pickled, pickled_type) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index 132a1aedca33..459c0066c3e9 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -227,6 +227,7 @@ def __init__( self.iterable_state_write = iterable_state_write self._requirements = set(requirements) self.enable_best_effort_deterministic_pickling = False + self.enable_lambda_name_pickling = False def add_requirement(self, requirement: str) -> None: self._requirements.add(requirement) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index d2cf836713fb..d61cb02a419c 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -792,6 +792,8 @@ def to_runner_api_pickled(self, context): self, enable_best_effort_determinism=context. enable_best_effort_deterministic_pickling, + enable_lambda_name=context. + enable_lambda_name_pickling, ), ) From d1722992ce75b92849af716028a38b76ddc97a62 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 19 Aug 2025 10:33:13 -0400 Subject: [PATCH 02/15] add enable_lambda_name to __init__ --- sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index 46d2cf01f6ae..dd0a732cca32 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -1350,6 +1350,7 @@ def __init__( self.globals_ref = {} self.proto = int(protocol) self.config = config + self.enable_lambda_name=enable_lambda_name if not PYPY: # pickle.Pickler is the C implementation of the CPython pickler and From 1f9725e17cb35eab91d85d7a1b7d61fb45317d2c Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 19 Aug 2025 11:43:32 -0400 Subject: [PATCH 03/15] fix formatting and lint --- .../internal/cloudpickle/cloudpickle.py | 16 ++++++++++++---- .../apache_beam/internal/cloudpickle_pickler.py | 2 +- .../apache_beam/internal/code_object_pickler.py | 7 +++---- sdks/python/apache_beam/internal/pickler_test.py | 2 +- sdks/python/apache_beam/transforms/ptransform.py | 3 +-- 5 files changed, 18 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index dd0a732cca32..61c319b5fcf5 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -78,6 +78,9 @@ import warnings import weakref +from apache_beam.internal.code_object_pickler import get_code_from_identifier +from apache_beam.internal.code_object_pickler import get_code_object_identifier + # The following import is required to be imported in the cloudpickle # namespace to be able to load pickle files generated with older versions of # cloudpickle. See: tests/test_backward_compat.py @@ -1272,7 +1275,7 @@ def _dynamic_function_reduce(self, func): newargs = self._function_getnewargs(func) state = _function_getstate(func) if type(newargs[0]) == str: - make_function - _make_function_from_identifier + make_function = _make_function_from_identifier else: make_function = _make_function return (make_function, newargs, state, None, None, _function_setstate) @@ -1292,7 +1295,8 @@ def _function_reduce(self, obj): return self._dynamic_function_reduce(obj) def _function_getnewargs(self, func): - code_path = get_code_object_indentifier(func) if self.enable_lambda_name else None + code_path = get_code_object_indentifier( + func) if self.enable_lambda_name else None code = func.__code__ # base_globals represents the future global namespace of func at @@ -1339,7 +1343,11 @@ def dump(self, obj): raise def __init__( - self, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG, + self, + file, + protocol=None, + buffer_callback=None, + config=DEFAULT_CONFIG, enable_lambda_name=False): if protocol is None: protocol = DEFAULT_PROTOCOL @@ -1350,7 +1358,7 @@ def __init__( self.globals_ref = {} self.proto = int(protocol) self.config = config - self.enable_lambda_name=enable_lambda_name + self.enable_lambda_name = enable_lambda_name if not PYPY: # pickle.Pickler is the C implementation of the CPython pickler and diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 757d6a19682c..b9affffdc5c3 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -129,7 +129,7 @@ def dumps( with _pickle_lock: with io.BytesIO() as file: pickler = cloudpickle.CloudPickler( - file, config=config, enable_lambda_name=enable_lambda_name) + file, config=config, enable_lambda_name=enable_lambda_name) try: pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index 0d0f4a3005fe..f1e035c938e0 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -19,7 +19,7 @@ This module provides helper functions to improve pickling code objects, especially lambdas, in a consistent way by using code object identifiers. These -helper functions will be used to patch pickler implementations used by Beam +helper functions are used to patch pickler implementations used by Beam (e.g. Cloudpickle). A code object identifier is a unique identifier for a code object that provides @@ -476,9 +476,8 @@ def _get_arg_count(obj: types.CodeType): code object. """ return ( - obj.co_argcount - + obj.co_kwonlyargcount - + (obj.co_flags & 4 == 4) # PyCF_VARARGS + obj.co_argcount + obj.co_kwonlyargcount + + (obj.co_flags & 4 == 4) # PyCF_VARARGS + (obj.co_flags & 8 == 8) # PyCF_VARKEYWORDS ) diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 1f4d15696338..33408f9a39dc 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -282,7 +282,6 @@ def test_disable_best_effort_determinism(self): dumps(set1, enable_best_effort_determinism=False), dumps(set2, enable_best_effort_determinism=False)) - def test_enable_lambda_name_pickling(self): pickler.set_library('cloudpickle') pickled = pickle_depickle(lambda x: x, enable_lambda_name=True) @@ -295,5 +294,6 @@ def test_disable_lambda_name_pickling(self): pickled_type = type(pickled) self.assertIsInstance(pickled, pickled_type) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index d61cb02a419c..8dd5641f41c9 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -792,8 +792,7 @@ def to_runner_api_pickled(self, context): self, enable_best_effort_determinism=context. enable_best_effort_deterministic_pickling, - enable_lambda_name=context. - enable_lambda_name_pickling, + enable_lambda_name=context.enable_lambda_name_pickling, ), ) From a9e6652cc06ee2cd10a1b98f5ab064e217659160 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 12:03:27 -0400 Subject: [PATCH 04/15] fix typo --- sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index 61c319b5fcf5..3cdae6105767 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -1295,7 +1295,7 @@ def _function_reduce(self, obj): return self._dynamic_function_reduce(obj) def _function_getnewargs(self, func): - code_path = get_code_object_indentifier( + code_path = get_code_object_identifier( func) if self.enable_lambda_name else None code = func.__code__ From c5fa8318772c4d10c281b7f732b7e765dfcfaab8 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 13:46:12 -0400 Subject: [PATCH 05/15] fix code paths in test --- .../internal/code_object_pickler_test.py | 57 ++++++++++--------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index 2060533e9328..df2dfff1d889 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -126,30 +126,33 @@ def get_lambda_from_dictionary(): return get_lambda_from_dictionary() +prefix = ("__main__" if __name__ == "__main__" else + "apache_beam.internal.code_object_pickler_test") + test_cases = [ ( top_level_function, - "apache_beam.internal.code_object_pickler_test.top_level_function" + f"{prefix}.top_level_function" ".__code__"), ( top_level_lambda, - "apache_beam.internal.code_object_pickler_test.top_level_lambda" + f"{prefix}.top_level_lambda" ".__code__"), ( get_nested_function(), ( - "apache_beam.internal.code_object_pickler_test.get_nested_function" + f"{prefix}.get_nested_function" ".__code__.co_consts[nested_function]")), ( get_lambda_from_dictionary(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".get_lambda_from_dictionary.__code__.co_consts[, ('x',)]") ), ( get_lambda_from_dictionary_same_args(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".get_lambda_from_dictionary_same_args.__code__.co_consts" "[, ('x',), " + hashlib.md5( get_lambda_from_dictionary_same_args().__code__.co_code). @@ -157,51 +160,51 @@ def get_lambda_from_dictionary(): ( function_with_lambda_default_argument(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".function_with_lambda_default_argument.__defaults__[0].__code__")), ( function_with_function_default_argument(), - "apache_beam.internal.code_object_pickler_test.top_level_function" + f"{prefix}.top_level_function" ".__code__"), ( add_one, - "apache_beam.internal.code_object_pickler_test.function_decorator" + f"{prefix}.function_decorator" ".__code__.co_consts[]"), ( ClassWithFunction.process, - "apache_beam.internal.code_object_pickler_test.ClassWithFunction" + f"{prefix}.ClassWithFunction" ".process.__code__"), ( ClassWithStaticMethod.static_method, - "apache_beam.internal.code_object_pickler_test.ClassWithStaticMethod" + f"{prefix}.ClassWithStaticMethod" ".static_method.__code__"), ( ClassWithClassMethod.class_method, - "apache_beam.internal.code_object_pickler_test.ClassWithClassMethod" + f"{prefix}.ClassWithClassMethod" ".class_method.__code__"), ( ClassWithNestedFunction().process(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".ClassWithNestedFunction.process.__code__.co_consts" "[nested_function]")), ( ClassWithLambda().process(), - "apache_beam.internal.code_object_pickler_test.ClassWithLambda.process" + f"{prefix}.ClassWithLambda.process" ".__code__.co_consts[]"), ( ClassWithNestedClass.InnerClass().process, - "apache_beam.internal.code_object_pickler_test.ClassWithNestedClass" + f"{prefix}ClassWithNestedClass" ".InnerClass.process.__code__"), ( ClassWithNestedLambda().process(), ( - "apache_beam.internal.code_object_pickler_test" + f"{prefix} ".ClassWithNestedLambda.process.__code__.co_consts" "[get_lambda_from_dictionary].co_consts[, ('x',)]")), ( ClassWithNestedLambda.process, - "apache_beam.internal.code_object_pickler_test.ClassWithNestedLambda" + f"{prefix}.ClassWithNestedLambda" ".process.__code__"), ] @@ -225,35 +228,35 @@ def test_roundtrip(self, callable, unused_path): class GetCodeFromCodeObjectIdentifierTest(unittest.TestCase): - def empty_path_raises_exception(self): + def test_empty_path_raises_exception(self): with self.assertRaisesRegex(ValueError, "Path must not be empty"): - code_object_pickler.test_get_code_from_identifier("") + code_object_pickler.get_code_from_identifier("") - def invalid_default_index_raises_exception(self): + def test_invalid_default_index_raises_exception(self): with self.assertRaisesRegex(ValueError, "out of bounds"): - code_object_pickler.test_get_code_from_identifier( - "apache_beam.internal.test_cases.module_with_default_argument." + code_object_pickler.get_code_from_identifier( + "apache_beam.internal.test_data.module_with_default_argument." "function_with_lambda_default_argument.__defaults__[1]") - def invalid_single_name_path_raises_exception(self): + def test_invalid_single_name_path_raises_exception(self): with self.assertRaisesRegex(AttributeError, "Could not find code object with path"): code_object_pickler.get_code_from_identifier( - "apache_beam.internal.test_cases.module_3." + "apache_beam.internal.test_data.module_3." "my_function.__code__.co_consts[something]") - def invalid_lambda_with_args_path_raises_exception(self): + def test_invalid_lambda_with_args_path_raises_exception(self): with self.assertRaisesRegex(AttributeError, "Could not find code object with path"): code_object_pickler.get_code_from_identifier( - "apache_beam.internal.test_cases.module_3." + "apache_beam.internal.test_data.module_3." "my_function.__code__.co_consts[, ('x',)]") - def invalid_lambda_with_hash_path_raises_exception(self): + def test_invalid_lambda_with_hash_path_raises_exception(self): with self.assertRaisesRegex(AttributeError, "Could not find code object with path"): code_object_pickler.get_code_from_identifier( - "apache_beam.internal.test_cases.module_3." + "apache_beam.internal.test_data.module_3." "my_function.__code__.co_consts[, ('',), 1234567890]") def test_adding_local_variable_in_class_preserves_object(self): From c32c32f7bb6cf1e06a3f173af4268493d1c1ea5d Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 14:49:27 -0400 Subject: [PATCH 06/15] fix tests --- .../internal/code_object_pickler_test.py | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index df2dfff1d889..5857e1911f37 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -286,12 +286,14 @@ def test_adding_lambda_variable_in_class_preserves_object(self): module_2_modified.AddLambdaVariable.my_method(self).__code__, ) - def test_removing_lambda_variable_in_class_changes_object(self): - with self.assertRaisesRegex(AttributeError, "object has no attribute"): - code_object_pickler.get_code_from_identifier( - code_object_pickler.get_code_object_identifier( - module_2.RemoveLambdaVariable.my_method(self)).replace( - "module_2", "module_2_modified")) + def test_removing_lambda_variable_in_class_preserves_object(self): + self.assertEqual( + code_object_pickler.get_code_from_identifier( + code_object_pickler.get_code_object_identifier( + module_2.RemoveLambdaVariable.my_method(self)).replace( + "module_2", "module_2_modified")), + module_2_modified.RemoveLambdaVariable.my_method(self).__code__, + ) def test_adding_nested_function_in_class_preserves_object(self): self.assertEqual( @@ -403,11 +405,15 @@ def test_adding_lambda_variable_in_function_preserves_object(self): module_1_lambda_variable_added.my_function().__code__, ) - def test_removing_lambda_variable_in_function_raises_exception(self): - with self.assertRaisesRegex(AttributeError, "object has no attribute"): - code_object_pickler.get_code_from_identifier( - code_object_pickler.get_code_object_identifier( - module_3.my_function()).replace("module_3", "module_3_modified")) + def test_removing_lambda_variable_in_function_preserves_object(self): + self.assertEqual( + code_object_pickler.get_code_from_identifier( + code_object_pickler.get_code_object_identifier( + module_3.my_function() + ).replace("module_3", "module_3_modified") + ), + module_3_modified.my_function().__code__, + ) class CodePathStabilityTest(unittest.TestCase): From df614e0985846f607e3e78ad73c06f2df6405253 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 15:49:45 -0400 Subject: [PATCH 07/15] fix lint --- .../internal/code_object_pickler_test.py | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index 5857e1911f37..80f53e20c1a9 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -146,13 +146,13 @@ def get_lambda_from_dictionary(): ( get_lambda_from_dictionary(), ( - f"{prefix} + f"{prefix}" ".get_lambda_from_dictionary.__code__.co_consts[, ('x',)]") ), ( get_lambda_from_dictionary_same_args(), ( - f"{prefix} + f"{prefix}" ".get_lambda_from_dictionary_same_args.__code__.co_consts" "[, ('x',), " + hashlib.md5( get_lambda_from_dictionary_same_args().__code__.co_code). @@ -160,7 +160,7 @@ def get_lambda_from_dictionary(): ( function_with_lambda_default_argument(), ( - f"{prefix} + f"{prefix}" ".function_with_lambda_default_argument.__defaults__[0].__code__")), ( function_with_function_default_argument(), @@ -185,27 +185,23 @@ def get_lambda_from_dictionary(): ( ClassWithNestedFunction().process(), ( - f"{prefix} - ".ClassWithNestedFunction.process.__code__.co_consts" + f"{prefix}.ClassWithNestedFunction.process.__code__.co_consts" "[nested_function]")), ( ClassWithLambda().process(), - f"{prefix}.ClassWithLambda.process" - ".__code__.co_consts[]"), + f"{prefix}.ClassWithLambda.process.__code__.co_consts[]"), ( ClassWithNestedClass.InnerClass().process, - f"{prefix}ClassWithNestedClass" - ".InnerClass.process.__code__"), + f"{prefix}ClassWithNestedClass.InnerClass.process.__code__"), ( ClassWithNestedLambda().process(), ( - f"{prefix} + f"{prefix}" ".ClassWithNestedLambda.process.__code__.co_consts" "[get_lambda_from_dictionary].co_consts[, ('x',)]")), ( ClassWithNestedLambda.process, - f"{prefix}.ClassWithNestedLambda" - ".process.__code__"), + f"{prefix}.ClassWithNestedLambda.process.__code__"), ] From 261104a223a3eb91c152248686e0f4b3608028e0 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Wed, 20 Aug 2025 16:44:54 -0400 Subject: [PATCH 08/15] fix formatting and failing test --- .../internal/code_object_pickler_test.py | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index 80f53e20c1a9..ce42407205ed 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -126,21 +126,17 @@ def get_lambda_from_dictionary(): return get_lambda_from_dictionary() -prefix = ("__main__" if __name__ == "__main__" else - "apache_beam.internal.code_object_pickler_test") +prefix = ( + "__main__" if __name__ == "__main__" else + "apache_beam.internal.code_object_pickler_test") test_cases = [ - ( - top_level_function, - f"{prefix}.top_level_function" + (top_level_function, f"{prefix}.top_level_function" ".__code__"), - ( - top_level_lambda, - f"{prefix}.top_level_lambda" + (top_level_lambda, f"{prefix}.top_level_lambda" ".__code__"), ( - get_nested_function(), - ( + get_nested_function(), ( f"{prefix}.get_nested_function" ".__code__.co_consts[nested_function]")), ( @@ -166,9 +162,7 @@ def get_lambda_from_dictionary(): function_with_function_default_argument(), f"{prefix}.top_level_function" ".__code__"), - ( - add_one, - f"{prefix}.function_decorator" + (add_one, f"{prefix}.function_decorator" ".__code__.co_consts[]"), ( ClassWithFunction.process, @@ -192,7 +186,7 @@ def get_lambda_from_dictionary(): f"{prefix}.ClassWithLambda.process.__code__.co_consts[]"), ( ClassWithNestedClass.InnerClass().process, - f"{prefix}ClassWithNestedClass.InnerClass.process.__code__"), + f"{prefix}.ClassWithNestedClass.InnerClass.process.__code__"), ( ClassWithNestedLambda().process(), ( @@ -405,9 +399,8 @@ def test_removing_lambda_variable_in_function_preserves_object(self): self.assertEqual( code_object_pickler.get_code_from_identifier( code_object_pickler.get_code_object_identifier( - module_3.my_function() - ).replace("module_3", "module_3_modified") - ), + module_3.my_function()).replace( + "module_3", "module_3_modified")), module_3_modified.my_function().__code__, ) From d1618f4bbad2da81bfa1f26231973c6fe27e0117 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Thu, 21 Aug 2025 13:22:38 -0400 Subject: [PATCH 09/15] fix formatting again --- .../apache_beam/internal/code_object_pickler_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index ce42407205ed..25b41b2b9393 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -132,9 +132,9 @@ def get_lambda_from_dictionary(): test_cases = [ (top_level_function, f"{prefix}.top_level_function" - ".__code__"), + ".__code__"), (top_level_lambda, f"{prefix}.top_level_lambda" - ".__code__"), + ".__code__"), ( get_nested_function(), ( f"{prefix}.get_nested_function" @@ -163,7 +163,7 @@ def get_lambda_from_dictionary(): f"{prefix}.top_level_function" ".__code__"), (add_one, f"{prefix}.function_decorator" - ".__code__.co_consts[]"), + ".__code__.co_consts[]"), ( ClassWithFunction.process, f"{prefix}.ClassWithFunction" @@ -400,7 +400,7 @@ def test_removing_lambda_variable_in_function_preserves_object(self): code_object_pickler.get_code_from_identifier( code_object_pickler.get_code_object_identifier( module_3.my_function()).replace( - "module_3", "module_3_modified")), + "module_3", "module_3_modified")), module_3_modified.my_function().__code__, ) From 66ac9b15ed699be06529ca16b2ff287d6f5a14f5 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 26 Aug 2025 10:22:58 -0400 Subject: [PATCH 10/15] remove cloudpickle implementation to leave only typo fixes and fixing test structure. --- .../internal/cloudpickle/cloudpickle.py | 27 ++----------------- .../internal/cloudpickle_pickler.py | 4 +-- .../internal/code_object_pickler.py | 22 ++++----------- .../apache_beam/internal/dill_pickler.py | 7 +---- sdks/python/apache_beam/internal/pickler.py | 6 ++--- .../apache_beam/internal/pickler_test.py | 16 ----------- .../apache_beam/runners/pipeline_context.py | 1 - .../apache_beam/transforms/ptransform.py | 1 - 8 files changed, 11 insertions(+), 73 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index 3cdae6105767..e74847587578 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -78,9 +78,6 @@ import warnings import weakref -from apache_beam.internal.code_object_pickler import get_code_from_identifier -from apache_beam.internal.code_object_pickler import get_code_object_identifier - # The following import is required to be imported in the cloudpickle # namespace to be able to load pickle files generated with older versions of # cloudpickle. See: tests/test_backward_compat.py @@ -529,11 +526,6 @@ def _make_function(code, globals, name, argdefs, closure): return types.FunctionType(code, globals, name, argdefs, closure) -def _make_function_from_identifier(code_path, globals, name, argdefs, closure): - fcode = get_code_from_identifier(code_path) - return _make_function(fcode, globals, name, argdefs, closure) - - def _make_empty_cell(): if False: # trick the compiler into creating an empty cell in our lambda @@ -1274,10 +1266,6 @@ def _dynamic_function_reduce(self, func): """Reduce a function that is not pickleable via attribute lookup.""" newargs = self._function_getnewargs(func) state = _function_getstate(func) - if type(newargs[0]) == str: - make_function = _make_function_from_identifier - else: - make_function = _make_function return (make_function, newargs, state, None, None, _function_setstate) def _function_reduce(self, obj): @@ -1295,8 +1283,6 @@ def _function_reduce(self, obj): return self._dynamic_function_reduce(obj) def _function_getnewargs(self, func): - code_path = get_code_object_identifier( - func) if self.enable_lambda_name else None code = func.__code__ # base_globals represents the future global namespace of func at @@ -1327,10 +1313,7 @@ def _function_getnewargs(self, func): else: closure = tuple(_make_empty_cell() for _ in range(len(code.co_freevars))) - if code_path: - return code_path, base_globals, None, None, closure - else: - return code, base_globals, None, None, closure + return code, base_globals, None, None, closure def dump(self, obj): try: @@ -1343,12 +1326,7 @@ def dump(self, obj): raise def __init__( - self, - file, - protocol=None, - buffer_callback=None, - config=DEFAULT_CONFIG, - enable_lambda_name=False): + self, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG): if protocol is None: protocol = DEFAULT_PROTOCOL super().__init__(file, protocol=protocol, buffer_callback=buffer_callback) @@ -1358,7 +1336,6 @@ def __init__( self.globals_ref = {} self.proto = int(protocol) self.config = config - self.enable_lambda_name = enable_lambda_name if not PYPY: # pickle.Pickler is the C implementation of the CPython pickler and diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index b9affffdc5c3..63038e770f27 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -117,7 +117,6 @@ def dumps( enable_trace=True, use_zlib=False, enable_best_effort_determinism=False, - enable_lambda_name=False, config: cloudpickle.CloudPickleConfig = DEFAULT_CONFIG) -> bytes: """For internal use only; no backwards-compatibility guarantees.""" if enable_best_effort_determinism: @@ -128,8 +127,7 @@ def dumps( 'This has only been implemented for dill.') with _pickle_lock: with io.BytesIO() as file: - pickler = cloudpickle.CloudPickler( - file, config=config, enable_lambda_name=enable_lambda_name) + pickler = cloudpickle.CloudPickler(file, config=config) try: pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index f1e035c938e0..81bdfafbfd7a 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -19,7 +19,7 @@ This module provides helper functions to improve pickling code objects, especially lambdas, in a consistent way by using code object identifiers. These -helper functions are used to patch pickler implementations used by Beam +helper functions will be used to patch pickler implementations used by Beam (e.g. Cloudpickle). A code object identifier is a unique identifier for a code object that provides @@ -375,7 +375,7 @@ def _get_code_object_from_lambda_with_args_pattern( for obj_ in objects: args = tuple( re.findall(_ARGUMENT_PATTERN, lambda_with_args_result.group(2))) - if obj_.co_varnames[:_get_arg_count(obj_)] == args: + if obj_.co_varnames == args: return obj_ raise AttributeError(f'Could not find code object with path: {path}') @@ -404,7 +404,7 @@ def _get_code_object_from_lambda_with_hash_pattern( for obj_ in objects: args = tuple( re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2))) - if obj_.co_varnames[:_get_arg_count(obj_)] == args: + if obj_.co_varnames == args: hash_value = lambda_with_hash_result.group(3) if hash_value == str(_create_bytecode_hash(obj_)): return obj_ @@ -462,24 +462,12 @@ def _signature(obj: types.CodeType): Returns: A tuple of the names of the arguments of the code object. """ - return obj.co_varnames[:_get_arg_count(obj)] - - -def _get_arg_count(obj: types.CodeType): - """Returns the number of arguments of a code object. - - Args: - obj: A code object, function, method, or cell. - - Returns: - The number of arguments of the code object, or None if the object is not a - code object. - """ - return ( + arg_count = ( obj.co_argcount + obj.co_kwonlyargcount + (obj.co_flags & 4 == 4) # PyCF_VARARGS + (obj.co_flags & 8 == 8) # PyCF_VARKEYWORDS ) + return obj.co_varnames[:arg_count] def _create_bytecode_hash(code_object: types.CodeType): diff --git a/sdks/python/apache_beam/internal/dill_pickler.py b/sdks/python/apache_beam/internal/dill_pickler.py index bbfc50546640..9a3d43826610 100644 --- a/sdks/python/apache_beam/internal/dill_pickler.py +++ b/sdks/python/apache_beam/internal/dill_pickler.py @@ -379,13 +379,8 @@ def dumps( o, enable_trace=True, use_zlib=False, - enable_best_effort_determinism=False, - enable_lambda_name=False) -> bytes: + enable_best_effort_determinism=False) -> bytes: """For internal use only; no backwards-compatibility guarantees.""" - if enable_lambda_name: - logging.info( - 'Ignoring unsupported option: enable_lambda_name. ' - 'This has only been implemented for CloudPickle.') with _pickle_lock: if enable_best_effort_determinism: old_save_set = dill.dill.Pickler.dispatch[set] diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index d43659df1b54..256f88c5453f 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -42,15 +42,13 @@ def dumps( o, enable_trace=True, use_zlib=False, - enable_best_effort_determinism=False, - enable_lambda_name=False) -> bytes: + enable_best_effort_determinism=False) -> bytes: return desired_pickle_lib.dumps( o, enable_trace=enable_trace, use_zlib=use_zlib, - enable_best_effort_determinism=enable_best_effort_determinism, - enable_lambda_name=enable_lambda_name) + enable_best_effort_determinism=enable_best_effort_determinism) def loads(encoded, enable_trace=True, use_zlib=False): diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 33408f9a39dc..7048f680de87 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -34,10 +34,6 @@ from apache_beam.internal.pickler import loads -def pickle_depickle(obj, enable_lambda_name): - return loads(dumps(obj, enable_lambda_name=enable_lambda_name)) - - class PicklerTest(unittest.TestCase): NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") @@ -282,18 +278,6 @@ def test_disable_best_effort_determinism(self): dumps(set1, enable_best_effort_determinism=False), dumps(set2, enable_best_effort_determinism=False)) - def test_enable_lambda_name_pickling(self): - pickler.set_library('cloudpickle') - pickled = pickle_depickle(lambda x: x, enable_lambda_name=True) - pickled_type = type(pickled) - self.assertIsInstance(pickled, pickled_type) - - def test_disable_lambda_name_pickling(self): - pickler.set_library('cloudpickle') - pickled = pickle_depickle(lambda x: x, enable_lambda_name=False) - pickled_type = type(pickled) - self.assertIsInstance(pickled, pickled_type) - if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index 459c0066c3e9..132a1aedca33 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -227,7 +227,6 @@ def __init__( self.iterable_state_write = iterable_state_write self._requirements = set(requirements) self.enable_best_effort_deterministic_pickling = False - self.enable_lambda_name_pickling = False def add_requirement(self, requirement: str) -> None: self._requirements.add(requirement) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 8dd5641f41c9..d2cf836713fb 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -792,7 +792,6 @@ def to_runner_api_pickled(self, context): self, enable_best_effort_determinism=context. enable_best_effort_deterministic_pickling, - enable_lambda_name=context.enable_lambda_name_pickling, ), ) From 348c893ccc3d7135fdd2ae54b28652e265036f5f Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 26 Aug 2025 10:26:22 -0400 Subject: [PATCH 11/15] fix _make_function typo --- sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index e74847587578..5a9d89430fd3 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -1266,7 +1266,7 @@ def _dynamic_function_reduce(self, func): """Reduce a function that is not pickleable via attribute lookup.""" newargs = self._function_getnewargs(func) state = _function_getstate(func) - return (make_function, newargs, state, None, None, _function_setstate) + return (_make_function, newargs, state, None, None, _function_setstate) def _function_reduce(self, obj): """Reducer for function objects. From dbf26118251cdd791042766fb8e1d4af92a28585 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 26 Aug 2025 10:38:30 -0400 Subject: [PATCH 12/15] revert regex --- sdks/python/apache_beam/internal/code_object_pickler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler.py b/sdks/python/apache_beam/internal/code_object_pickler.py index 81bdfafbfd7a..b6ea015cc06f 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler.py +++ b/sdks/python/apache_beam/internal/code_object_pickler.py @@ -315,10 +315,10 @@ def _search_lambda( _SINGLE_NAME_PATTERN = re.compile(r'co_consts\[([a-zA-Z0-9\<\>_-]+)]') # Matches a path like: co_consts[, ('x',)] _LAMBDA_WITH_ARGS_PATTERN = re.compile( - r"co_consts\[(<.*?>),\s(\('[^']+'(?:,\s*'[^']+')*,?\))\]") + r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\))\]") # Matches a path like: co_consts[, ('x',), 1234567890] _LAMBDA_WITH_HASH_PATTERN = re.compile( - r"co_consts\[(<[^>]+>),\s*(\([^\)]*\)),?\s*(.*)\]") + r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\)),\s*(.+)\]") # Matches a path like: __defaults__[0] _DEFAULT_PATTERN = re.compile(r'(__defaults__)\[(\d+)\]') # Matches an argument like: 'x' From ecc37bbfae0077006dc2f3b5b5c85a6cb9e37283 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 26 Aug 2025 11:57:03 -0400 Subject: [PATCH 13/15] fix failing tests --- .../internal/code_object_pickler_test.py | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index 25b41b2b9393..e3b50d28f368 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -276,14 +276,13 @@ def test_adding_lambda_variable_in_class_preserves_object(self): module_2_modified.AddLambdaVariable.my_method(self).__code__, ) - def test_removing_lambda_variable_in_class_preserves_object(self): - self.assertEqual( - code_object_pickler.get_code_from_identifier( - code_object_pickler.get_code_object_identifier( - module_2.RemoveLambdaVariable.my_method(self)).replace( - "module_2", "module_2_modified")), - module_2_modified.RemoveLambdaVariable.my_method(self).__code__, - ) + def test_removing_lambda_variable_in_class_changes_object(self): + with self.assertRaisesRegex(AttributeError, "object has no attribute"): + code_object_pickler.get_code_from_identifier( + code_object_pickler.get_code_object_identifier( + module_2.RemoveLambdaVariable.my_method(self)).replace( + "module_2", "module_2_modified")) + def test_adding_nested_function_in_class_preserves_object(self): self.assertEqual( @@ -395,14 +394,11 @@ def test_adding_lambda_variable_in_function_preserves_object(self): module_1_lambda_variable_added.my_function().__code__, ) - def test_removing_lambda_variable_in_function_preserves_object(self): - self.assertEqual( - code_object_pickler.get_code_from_identifier( - code_object_pickler.get_code_object_identifier( - module_3.my_function()).replace( - "module_3", "module_3_modified")), - module_3_modified.my_function().__code__, - ) + def test_removing_lambda_variable_in_function_raises_exception(self): + with self.assertRaisesRegex(AttributeError, "object has no attribute"): + code_object_pickler.get_code_from_identifier( + code_object_pickler.get_code_object_identifier( + module_3.my_function()).replace("module_3", "module_3_modified")) class CodePathStabilityTest(unittest.TestCase): From adb3e9d2189be7a48734ab9802bd6461bfa50fa7 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 26 Aug 2025 13:39:32 -0400 Subject: [PATCH 14/15] fix formatting --- sdks/python/apache_beam/internal/code_object_pickler_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index e3b50d28f368..8e22056d9688 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -283,7 +283,6 @@ def test_removing_lambda_variable_in_class_changes_object(self): module_2.RemoveLambdaVariable.my_method(self)).replace( "module_2", "module_2_modified")) - def test_adding_nested_function_in_class_preserves_object(self): self.assertEqual( code_object_pickler.get_code_from_identifier( From 6bbe234d3a90871a6cfb66d6b5b5444379197b46 Mon Sep 17 00:00:00 2001 From: kristynsmith Date: Tue, 26 Aug 2025 15:48:53 -0400 Subject: [PATCH 15/15] update prefix to not hardcode --- sdks/python/apache_beam/internal/code_object_pickler_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/internal/code_object_pickler_test.py b/sdks/python/apache_beam/internal/code_object_pickler_test.py index 8e22056d9688..de01f16fd0a7 100644 --- a/sdks/python/apache_beam/internal/code_object_pickler_test.py +++ b/sdks/python/apache_beam/internal/code_object_pickler_test.py @@ -126,9 +126,7 @@ def get_lambda_from_dictionary(): return get_lambda_from_dictionary() -prefix = ( - "__main__" if __name__ == "__main__" else - "apache_beam.internal.code_object_pickler_test") +prefix = __name__ test_cases = [ (top_level_function, f"{prefix}.top_level_function"