Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"https://github.com/apache/beam/pull/32440": "test new datastream runner for batch"
"modification": 9
"modification": 10
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 10
"modification": 11
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 5
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/internal/pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@

USE_CLOUDPICKLE = 'cloudpickle'
USE_DILL = 'dill'
DEFAULT_PICKLE_LIB = USE_DILL

desired_pickle_lib = dill_pickler
DEFAULT_PICKLE_LIB = USE_CLOUDPICKLE
desired_pickle_lib = cloudpickle_pickler


def dumps(
Expand Down
104 changes: 90 additions & 14 deletions sdks/python/apache_beam/internal/pickler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
import types
import unittest

from parameterized import param
from parameterized import parameterized

from apache_beam.internal import module_test
from apache_beam.internal import pickler
from apache_beam.internal.pickler import dumps
from apache_beam.internal.pickler import loads

Expand All @@ -34,82 +38,152 @@ class PicklerTest(unittest.TestCase):

NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType")

def test_basics(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_basics(self, pickle_lib):
pickler.set_library(pickle_lib)

self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )])))
fun = lambda x: 'xyz-%s' % x
self.assertEqual('xyz-abc', loads(dumps(fun))('abc'))

def test_lambda_with_globals(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_lambda_with_globals(self, pickle_lib):
"""Tests that the globals of a function are preserved."""
pickler.set_library(pickle_lib)

# The point of the test is that the lambda being called after unpickling
# relies on having the re module being loaded.
self.assertEqual(['abc', 'def'],
loads(dumps(
module_test.get_lambda_with_globals()))('abc def'))

def test_lambda_with_main_globals(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_lambda_with_main_globals(self, pickle_lib):
pickler.set_library(pickle_lib)
self.assertEqual(unittest, loads(dumps(lambda: unittest))())

def test_lambda_with_closure(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_lambda_with_closure(self, pickle_lib):
"""Tests that the closure of a function is preserved."""
pickler.set_library(pickle_lib)
self.assertEqual(
'closure: abc',
loads(dumps(module_test.get_lambda_with_closure('abc')))())

def test_class(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_class(self, pickle_lib):
"""Tests that a class object is pickled correctly."""
pickler.set_library(pickle_lib)
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.Xyz))().foo('abc def'))

def test_object(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_object(self, pickle_lib):
"""Tests that a class instance is pickled correctly."""
pickler.set_library(pickle_lib)
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))

def test_nested_class(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_nested_class(self, pickle_lib):
"""Tests that a nested class object is pickled correctly."""
pickler.set_library(pickle_lib)
self.assertEqual(
'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
self.assertEqual(
'Y:abc',
loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum)

def test_dynamic_class(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_dynamic_class(self, pickle_lib):
"""Tests that a nested class object is pickled correctly."""
pickler.set_library(pickle_lib)
self.assertEqual(
'Z:abc', loads(dumps(module_test.create_class('abc'))).get())

def test_generators(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_generators(self, pickle_lib):
pickler.set_library(pickle_lib)
with self.assertRaises(TypeError):
dumps((_ for _ in range(10)))

def test_recursive_class(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_recursive_class(self, pickle_lib):
pickler.set_library(pickle_lib)
self.assertEqual(
'RecursiveClass:abc',
loads(dumps(module_test.RecursiveClass('abc').datum)))

def test_pickle_rlock(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_pickle_rlock(self, pickle_lib):
pickler.set_library(pickle_lib)
rlock_instance = threading.RLock()
rlock_type = type(rlock_instance)

self.assertIsInstance(loads(dumps(rlock_instance)), rlock_type)

def test_save_paths(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_save_paths(self, pickle_lib):
pickler.set_library(pickle_lib)
f = loads(dumps(lambda x: x))
co_filename = f.__code__.co_filename
self.assertTrue(co_filename.endswith('pickler_test.py'))

@unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced')
def test_dump_and_load_mapping_proxy(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_dump_and_load_mapping_proxy(self, pickle_lib):
pickler.set_library(pickle_lib)
self.assertEqual(
'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc'])
self.assertEqual(
types.MappingProxyType, type(loads(dumps(types.MappingProxyType({})))))

# pylint: disable=exec-used
@unittest.skipIf(sys.version_info < (3, 7), 'Python 3.7 or above only')
def test_dataclass(self):
@parameterized.expand([
param(pickle_lib='dill'),
param(pickle_lib='cloudpickle'),
])
def test_dataclass(self, pickle_lib):
exec(
'''
from apache_beam.internal.module_test import DataClass
Expand Down Expand Up @@ -141,6 +215,7 @@ def maybe_get_sets_with_different_iteration_orders(self):
return set1, set2

def test_best_effort_determinism(self):
pickler.set_library('dill')
set1, set2 = self.maybe_get_sets_with_different_iteration_orders()
self.assertEqual(
dumps(set1, enable_best_effort_determinism=True),
Expand All @@ -152,6 +227,7 @@ def test_best_effort_determinism(self):
self.skipTest('Set iteration orders matched. Test results inconclusive.')

def test_disable_best_effort_determinism(self):
pickler.set_library('dill')
set1, set2 = self.maybe_get_sets_with_different_iteration_orders()
# The test relies on the sets having different iteration orders for the
# elements. Iteration order is implementation dependent and undefined,
Expand Down
15 changes: 12 additions & 3 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ def __init__(

FileSystems.set_options(self._options)

pickle_library = self._options.view_as(SetupOptions).pickle_library
pickler.set_library(pickle_library)

if runner is None:
runner = self._options.view_as(StandardOptions).runner
if runner is None:
Expand All @@ -210,6 +207,17 @@ def __init__(
'Runner %s is not a PipelineRunner object or the '
'name of a registered runner.' % runner)

# Runner can override the default pickler to be used.
if (self._options.view_as(SetupOptions).pickle_library == 'default' and
runner.default_pickle_library_override()):
logging.info(
"Runner defaulting to pickling library: %s.",
runner.default_pickle_library_override())
self._options.view_as(
SetupOptions).pickle_library = runner.default_pickle_library_override(
)
pickler.set_library(self._options.view_as(SetupOptions).pickle_library)

# Validate pipeline options
errors = PipelineOptionsValidator(self._options, runner).validate()
if errors:
Expand All @@ -228,6 +236,7 @@ def __init__(

# Default runner to be used.
self.runner = runner

# Stack of transforms generated by nested apply() calls. The stack will
# contain a root node as an enclosing (parent) node for top transforms.
self.transforms_stack = [
Expand Down
34 changes: 30 additions & 4 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.pvalue import AsSingleton
from apache_beam.pvalue import TaggedOutput
from apache_beam.runners.runner import PipelineRunner
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
Expand Down Expand Up @@ -156,6 +157,21 @@ def test_create(self):
pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10])
assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')

@mock.patch('logging.info')
def test_runner_overrides_default_pickler(self, mock_info):
with mock.patch.object(PipelineRunner,
'default_pickle_library_override') as mock_fn:
mock_fn.return_value = 'dill'
with TestPipeline() as pipeline:
pcoll = pipeline | 'label1' >> Create([1, 2, 3])
assert_that(pcoll, equal_to([1, 2, 3]))

from apache_beam.internal import pickler
from apache_beam.internal import dill_pickler
self.assertIs(pickler.desired_pickle_lib, dill_pickler)
mock_info.assert_any_call(
'Runner defaulting to pickling library: %s.', 'dill')

def test_flatmap_builtin(self):
with TestPipeline() as pipeline:
pcoll = pipeline | 'label1' >> Create([1, 2, 3])
Expand Down Expand Up @@ -279,17 +295,27 @@ def test_no_wait_until_finish(self, mock_info):
with Pipeline(runner='DirectRunner',
options=PipelineOptions(["--no_wait_until_finish"])) as p:
_ = p | beam.Create(['test'])
mock_info.assert_called_once_with(
mock_info.assert_any_call(
'Job execution continues without waiting for completion. '
'Use "wait_until_finish" in PipelineResult to block until finished.')
p.result.wait_until_finish()

def test_auto_unique_labels(self):

opts = PipelineOptions(["--auto_unique_labels"])
with mock.patch.object(uuid, 'uuid4') as mock_uuid_gen:
mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')]
mock_uuid_gen.side_effect = mock_uuids

mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')]
mock_uuid_gen = mock.Mock(side_effect=mock_uuids)

original_generate_unique_label = Pipeline._generate_unique_label

def patched_generate_unique_label(self, transform):
with mock.patch.object(uuid, 'uuid4', return_value=mock_uuid_gen()):
return original_generate_unique_label(self, transform)

with mock.patch.object(Pipeline,
'_generate_unique_label',
patched_generate_unique_label):
with TestPipeline(options=opts) as pipeline:
pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])

Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class DataflowRunner(PipelineRunner):
def __init__(self, cache=None):
self._default_environment = None

def default_pickle_library_override(self):
return 'cloudpickle'

def is_fnapi_compatible(self):
return False

Expand Down
24 changes: 16 additions & 8 deletions sdks/python/apache_beam/runners/direct/direct_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@
from apache_beam.testing import test_pipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.utils import shared


class DictWrapper(dict):
pass


def acquire_dict():
return DictWrapper()


class DirectPipelineResultTest(unittest.TestCase):
Expand Down Expand Up @@ -159,18 +168,17 @@ def test_retry_fork_graph(self):
# currently does not currently support retries.
p = beam.Pipeline(runner='BundleBasedDirectRunner')

# TODO(mariagh): Remove the use of globals from the test.
global count_b, count_c # pylint: disable=global-variable-undefined
count_b, count_c = 0, 0
shared_handler = shared.Shared()
counts = shared_handler.acquire(acquire_dict)
counts['count_b'] = 0
counts['count_c'] = 0

def f_b(x):
global count_b # pylint: disable=global-variable-undefined
count_b += 1
shared_handler.acquire(acquire_dict)['count_b'] += 1
raise Exception('exception in f_b')

def f_c(x):
global count_c # pylint: disable=global-variable-undefined
count_c += 1
shared_handler.acquire(acquire_dict)['count_c'] += 1
raise Exception('exception in f_c')

names = p | 'CreateNodeA' >> beam.Create(['Ann', 'Joe'])
Expand All @@ -180,7 +188,7 @@ def f_c(x):

with self.assertRaises(Exception):
p.run().wait_until_finish()
assert count_b == count_c == 4
assert counts['count_b'] == counts['count_c'] == 4

def test_no_partial_writeouts(self):
class TestTransformEvaluator(_TransformEvaluator):
Expand Down
Loading
Loading