diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index ed7f7f1e3863..0b759d797ee8 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,6 +1,6 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", "https://github.com/apache/beam/pull/32440": "test new datastream runner for batch" - "modification": 9 + "modification": 10 } diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json index 1012782ae381..12b91fa89ae4 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index bb11e2f79c20..afdc7f7012a8 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 10 + "modification": 11 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index e0266d62f2e0..f1ba03a243ee 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 5 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index c577bd3d4a25..256f88c5453f 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -33,9 +33,9 @@ USE_CLOUDPICKLE = 'cloudpickle' USE_DILL = 'dill' -DEFAULT_PICKLE_LIB = USE_DILL -desired_pickle_lib = dill_pickler +DEFAULT_PICKLE_LIB = USE_CLOUDPICKLE +desired_pickle_lib = cloudpickle_pickler def dumps( diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 60fa1e075522..c8c1dd5e5f7a 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -25,7 +25,11 @@ import types import unittest +from parameterized import param +from parameterized import parameterized + from apache_beam.internal import module_test +from apache_beam.internal import pickler from apache_beam.internal.pickler import dumps from apache_beam.internal.pickler import loads @@ -34,13 +38,24 @@ class PicklerTest(unittest.TestCase): NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") - def test_basics(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_basics(self, pickle_lib): + pickler.set_library(pickle_lib) + self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )]))) fun = lambda x: 'xyz-%s' % x self.assertEqual('xyz-abc', loads(dumps(fun))('abc')) - def test_lambda_with_globals(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_lambda_with_globals(self, pickle_lib): """Tests that the globals of a function are preserved.""" + pickler.set_library(pickle_lib) # The point of the test is that the lambda being called after unpickling # relies on having the re module being loaded. @@ -48,60 +63,115 @@ def test_lambda_with_globals(self): loads(dumps( module_test.get_lambda_with_globals()))('abc def')) - def test_lambda_with_main_globals(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_lambda_with_main_globals(self, pickle_lib): + pickler.set_library(pickle_lib) self.assertEqual(unittest, loads(dumps(lambda: unittest))()) - def test_lambda_with_closure(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_lambda_with_closure(self, pickle_lib): """Tests that the closure of a function is preserved.""" + pickler.set_library(pickle_lib) self.assertEqual( 'closure: abc', loads(dumps(module_test.get_lambda_with_closure('abc')))()) - def test_class(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_class(self, pickle_lib): """Tests that a class object is pickled correctly.""" + pickler.set_library(pickle_lib) self.assertEqual(['abc', 'def'], loads(dumps(module_test.Xyz))().foo('abc def')) - def test_object(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_object(self, pickle_lib): """Tests that a class instance is pickled correctly.""" + pickler.set_library(pickle_lib) self.assertEqual(['abc', 'def'], loads(dumps(module_test.XYZ_OBJECT)).foo('abc def')) - def test_nested_class(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_nested_class(self, pickle_lib): """Tests that a nested class object is pickled correctly.""" + pickler.set_library(pickle_lib) self.assertEqual( 'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum) self.assertEqual( 'Y:abc', loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum) - def test_dynamic_class(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_dynamic_class(self, pickle_lib): """Tests that a nested class object is pickled correctly.""" + pickler.set_library(pickle_lib) self.assertEqual( 'Z:abc', loads(dumps(module_test.create_class('abc'))).get()) - def test_generators(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_generators(self, pickle_lib): + pickler.set_library(pickle_lib) with self.assertRaises(TypeError): dumps((_ for _ in range(10))) - def test_recursive_class(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_recursive_class(self, pickle_lib): + pickler.set_library(pickle_lib) self.assertEqual( 'RecursiveClass:abc', loads(dumps(module_test.RecursiveClass('abc').datum))) - def test_pickle_rlock(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_pickle_rlock(self, pickle_lib): + pickler.set_library(pickle_lib) rlock_instance = threading.RLock() rlock_type = type(rlock_instance) self.assertIsInstance(loads(dumps(rlock_instance)), rlock_type) - def test_save_paths(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_save_paths(self, pickle_lib): + pickler.set_library(pickle_lib) f = loads(dumps(lambda x: x)) co_filename = f.__code__.co_filename self.assertTrue(co_filename.endswith('pickler_test.py')) @unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced') - def test_dump_and_load_mapping_proxy(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_dump_and_load_mapping_proxy(self, pickle_lib): + pickler.set_library(pickle_lib) self.assertEqual( 'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc']) self.assertEqual( @@ -109,7 +179,11 @@ def test_dump_and_load_mapping_proxy(self): # pylint: disable=exec-used @unittest.skipIf(sys.version_info < (3, 7), 'Python 3.7 or above only') - def test_dataclass(self): + @parameterized.expand([ + param(pickle_lib='dill'), + param(pickle_lib='cloudpickle'), + ]) + def test_dataclass(self, pickle_lib): exec( ''' from apache_beam.internal.module_test import DataClass @@ -141,6 +215,7 @@ def maybe_get_sets_with_different_iteration_orders(self): return set1, set2 def test_best_effort_determinism(self): + pickler.set_library('dill') set1, set2 = self.maybe_get_sets_with_different_iteration_orders() self.assertEqual( dumps(set1, enable_best_effort_determinism=True), @@ -152,6 +227,7 @@ def test_best_effort_determinism(self): self.skipTest('Set iteration orders matched. Test results inconclusive.') def test_disable_best_effort_determinism(self): + pickler.set_library('dill') set1, set2 = self.maybe_get_sets_with_different_iteration_orders() # The test relies on the sets having different iteration orders for the # elements. Iteration order is implementation dependent and undefined, diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index b91d27926c54..db03968a237d 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -191,9 +191,6 @@ def __init__( FileSystems.set_options(self._options) - pickle_library = self._options.view_as(SetupOptions).pickle_library - pickler.set_library(pickle_library) - if runner is None: runner = self._options.view_as(StandardOptions).runner if runner is None: @@ -210,6 +207,17 @@ def __init__( 'Runner %s is not a PipelineRunner object or the ' 'name of a registered runner.' % runner) + # Runner can override the default pickler to be used. + if (self._options.view_as(SetupOptions).pickle_library == 'default' and + runner.default_pickle_library_override()): + logging.info( + "Runner defaulting to pickling library: %s.", + runner.default_pickle_library_override()) + self._options.view_as( + SetupOptions).pickle_library = runner.default_pickle_library_override( + ) + pickler.set_library(self._options.view_as(SetupOptions).pickle_library) + # Validate pipeline options errors = PipelineOptionsValidator(self._options, runner).validate() if errors: @@ -228,6 +236,7 @@ def __init__( # Default runner to be used. self.runner = runner + # Stack of transforms generated by nested apply() calls. The stack will # contain a root node as an enclosing (parent) node for top transforms. self.transforms_stack = [ diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 6480b2db3c86..52ecb1a1d575 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -41,6 +41,7 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.pvalue import AsSingleton from apache_beam.pvalue import TaggedOutput +from apache_beam.runners.runner import PipelineRunner from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -156,6 +157,21 @@ def test_create(self): pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10]) assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3') + @mock.patch('logging.info') + def test_runner_overrides_default_pickler(self, mock_info): + with mock.patch.object(PipelineRunner, + 'default_pickle_library_override') as mock_fn: + mock_fn.return_value = 'dill' + with TestPipeline() as pipeline: + pcoll = pipeline | 'label1' >> Create([1, 2, 3]) + assert_that(pcoll, equal_to([1, 2, 3])) + + from apache_beam.internal import pickler + from apache_beam.internal import dill_pickler + self.assertIs(pickler.desired_pickle_lib, dill_pickler) + mock_info.assert_any_call( + 'Runner defaulting to pickling library: %s.', 'dill') + def test_flatmap_builtin(self): with TestPipeline() as pipeline: pcoll = pipeline | 'label1' >> Create([1, 2, 3]) @@ -279,7 +295,7 @@ def test_no_wait_until_finish(self, mock_info): with Pipeline(runner='DirectRunner', options=PipelineOptions(["--no_wait_until_finish"])) as p: _ = p | beam.Create(['test']) - mock_info.assert_called_once_with( + mock_info.assert_any_call( 'Job execution continues without waiting for completion. ' 'Use "wait_until_finish" in PipelineResult to block until finished.') p.result.wait_until_finish() @@ -287,9 +303,19 @@ def test_no_wait_until_finish(self, mock_info): def test_auto_unique_labels(self): opts = PipelineOptions(["--auto_unique_labels"]) - with mock.patch.object(uuid, 'uuid4') as mock_uuid_gen: - mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')] - mock_uuid_gen.side_effect = mock_uuids + + mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')] + mock_uuid_gen = mock.Mock(side_effect=mock_uuids) + + original_generate_unique_label = Pipeline._generate_unique_label + + def patched_generate_unique_label(self, transform): + with mock.patch.object(uuid, 'uuid4', return_value=mock_uuid_gen()): + return original_generate_unique_label(self, transform) + + with mock.patch.object(Pipeline, + '_generate_unique_label', + patched_generate_unique_label): with TestPipeline(options=opts) as pipeline: pcoll = pipeline | 'pcoll' >> Create([1, 2, 3]) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 860326d02d30..349fee3eff26 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -98,6 +98,9 @@ class DataflowRunner(PipelineRunner): def __init__(self, cache=None): self._default_environment = None + def default_pickle_library_override(self): + return 'cloudpickle' + def is_fnapi_compatible(self): return False diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py index 1af5f1bc7bea..92116c665b64 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner_test.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py @@ -41,6 +41,15 @@ from apache_beam.testing import test_pipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.utils import shared + + +class DictWrapper(dict): + pass + + +def acquire_dict(): + return DictWrapper() class DirectPipelineResultTest(unittest.TestCase): @@ -159,18 +168,17 @@ def test_retry_fork_graph(self): # currently does not currently support retries. p = beam.Pipeline(runner='BundleBasedDirectRunner') - # TODO(mariagh): Remove the use of globals from the test. - global count_b, count_c # pylint: disable=global-variable-undefined - count_b, count_c = 0, 0 + shared_handler = shared.Shared() + counts = shared_handler.acquire(acquire_dict) + counts['count_b'] = 0 + counts['count_c'] = 0 def f_b(x): - global count_b # pylint: disable=global-variable-undefined - count_b += 1 + shared_handler.acquire(acquire_dict)['count_b'] += 1 raise Exception('exception in f_b') def f_c(x): - global count_c # pylint: disable=global-variable-undefined - count_c += 1 + shared_handler.acquire(acquire_dict)['count_c'] += 1 raise Exception('exception in f_c') names = p | 'CreateNodeA' >> beam.Create(['Ann', 'Joe']) @@ -180,7 +188,7 @@ def f_c(x): with self.assertRaises(Exception): p.run().wait_until_finish() - assert count_b == count_c == 4 + assert counts['count_b'] == counts['count_c'] == 4 def test_no_partial_writeouts(self): class TestTransformEvaluator(_TransformEvaluator): diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 1030e9fbd8b5..e3b7a9de9483 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -225,6 +225,10 @@ def check_requirements( beam_runner_api_pb2.TimeDomain.PROCESSING_TIME): raise NotImplementedError(timer.time_domain) + def default_pickle_library_override(self): + """Default pickle library, can be overridden by runner implementation.""" + return None + # FIXME: replace with PipelineState(str, enum.Enum) class PipelineState(object):