Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 27
"modification": 28
}

11 changes: 8 additions & 3 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@
# occurs.
from apache_beam.internal.dill_pickler import dill
except ImportError:
# We fall back to using the stock dill library in tests that don't use the
# full Python SDK.
import dill
dill = None

__all__ = [
'Coder',
Expand Down Expand Up @@ -900,6 +898,13 @@ def to_type_hint(self):

class DillCoder(_PickleCoderBase):
"""Coder using dill's pickle functionality."""
def __init__(self):
if not dill:
raise RuntimeError(
"This pipeline contains a DillCoder which requires "
"the dill package. Install the dill package with the dill extra "
"e.g. apache-beam[dill]")

def _create_impl(self):
return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads)

Expand Down
26 changes: 26 additions & 0 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
except ImportError:
dataclasses = None # type: ignore

try:
import dill
except ImportError:
dill = None

MyNamedTuple = collections.namedtuple('A', ['x', 'y']) # type: ignore[name-match]
AnotherNamedTuple = collections.namedtuple('AnotherNamedTuple', ['x', 'y'])
MyTypedNamedTuple = NamedTuple('MyTypedNamedTuple', [('f1', int), ('f2', str)])
Expand Down Expand Up @@ -116,6 +121,7 @@ class UnFrozenDataClass:
# These tests need to all be run in the same process due to the asserts
# in tearDownClass.
@pytest.mark.no_xdist
@pytest.mark.uses_dill
class CodersTest(unittest.TestCase):

# These class methods ensure that we test each defined coder in both
Expand Down Expand Up @@ -173,6 +179,9 @@ def tearDownClass(cls):
coders.BigIntegerCoder, # tested in DecimalCoder
coders.TimestampPrefixingOpaqueWindowCoder,
])
if not dill:
standard -= set(
[coders.DillCoder, coders.DeterministicFastPrimitivesCoder])
cls.seen_nested -= set(
[coders.ProtoCoder, coders.ProtoPlusCoder, CustomCoder])
assert not standard - cls.seen, str(standard - cls.seen)
Expand Down Expand Up @@ -241,8 +250,13 @@ def test_memoizing_pickle_coder(self):
param(compat_version="2.67.0"),
])
def test_deterministic_coder(self, compat_version):

typecoders.registry.update_compatibility_version = compat_version
coder = coders.FastPrimitivesCoder()
if not dill and compat_version:
with self.assertRaises(RuntimeError):
coder.as_deterministic_coder(step_label="step")
self.skipTest('Dill not installed')
deterministic_coder = coder.as_deterministic_coder(step_label="step")

self.check_coder(deterministic_coder, *self.test_values_deterministic)
Expand Down Expand Up @@ -321,6 +335,11 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version):
coder = coders.MapCoder(
coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder())

if not dill and compat_version:
with self.assertRaises(RuntimeError):
coder.as_deterministic_coder(step_label="step")
self.skipTest('Dill not installed')

deterministic_coder = coder.as_deterministic_coder(step_label="step")

assert isinstance(
Expand All @@ -331,6 +350,11 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version):
self.check_coder(deterministic_coder, *values)

def test_dill_coder(self):
if not dill:
with self.assertRaises(RuntimeError):
coders.DillCoder()
self.skipTest('Dill not installed')

cell_value = (lambda x: lambda: x)(0).__closure__[0]
self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
self.check_coder(
Expand Down Expand Up @@ -661,6 +685,8 @@ def test_param_windowed_value_coder(self):
def test_cross_process_encoding_of_special_types_is_deterministic(
self, compat_version):
"""Test cross-process determinism for all special deterministic types"""
if compat_version:
pytest.importorskip("dill")

if sys.executable is None:
self.skipTest('No Python interpreter found')
Expand Down
26 changes: 23 additions & 3 deletions sdks/python/apache_beam/internal/pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@
"""

from apache_beam.internal import cloudpickle_pickler
from apache_beam.internal import dill_pickler

try:
from apache_beam.internal import dill_pickler
except ImportError:
dill_pickler = None # type: ignore[assignment]

USE_CLOUDPICKLE = 'cloudpickle'
USE_DILL = 'dill'
USE_DILL_UNSAFE = 'dill_unsafe'

DEFAULT_PICKLE_LIB = USE_CLOUDPICKLE
desired_pickle_lib = cloudpickle_pickler
Expand Down Expand Up @@ -74,14 +79,29 @@ def load_session(file_path):
def set_library(selected_library=DEFAULT_PICKLE_LIB):
""" Sets pickle library that will be used. """
global desired_pickle_lib

if selected_library == USE_DILL and not dill_pickler:
raise ImportError(
"Pipeline option pickle_library=dill is set, but dill is not "
"installed. Install apache-beam with the dill extras package "
"e.g. apache-beam[dill].")
if selected_library == USE_DILL_UNSAFE and not dill_pickler:
raise ImportError(
"Pipeline option pickle_library=dill_unsafe is set, but dill is not "
"installed. Install dill in job submission and runtime environments.")

is_currently_dill = (desired_pickle_lib == dill_pickler)
dill_is_requested = (
selected_library == USE_DILL or selected_library == USE_DILL_UNSAFE)

# If switching to or from dill, update the pickler hook overrides.
if (selected_library == USE_DILL) != (desired_pickle_lib == dill_pickler):
if is_currently_dill != dill_is_requested:
dill_pickler.override_pickler_hooks(selected_library == USE_DILL)

if selected_library == 'default':
selected_library = DEFAULT_PICKLE_LIB

if selected_library == USE_DILL:
if dill_is_requested:
desired_pickle_lib = dill_pickler
elif selected_library == USE_CLOUDPICKLE:
desired_pickle_lib = cloudpickle_pickler
Expand Down
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/internal/pickler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import types
import unittest

import pytest
from parameterized import param
from parameterized import parameterized

Expand All @@ -34,6 +35,12 @@
from apache_beam.internal.pickler import loads


def maybe_skip_if_no_dill(pickle_library):
if pickle_library == 'dill':
pytest.importorskip("dill")


@pytest.mark.uses_dill
class PicklerTest(unittest.TestCase):

NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType")
Expand All @@ -43,6 +50,7 @@ class PicklerTest(unittest.TestCase):
param(pickle_lib='cloudpickle'),
])
def test_basics(self, pickle_lib):
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)

self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )])))
Expand All @@ -55,6 +63,7 @@ def test_basics(self, pickle_lib):
])
def test_lambda_with_globals(self, pickle_lib):
"""Tests that the globals of a function are preserved."""
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)

# The point of the test is that the lambda being called after unpickling
Expand All @@ -68,6 +77,7 @@ def test_lambda_with_globals(self, pickle_lib):
param(pickle_lib='cloudpickle'),
])
def test_lambda_with_main_globals(self, pickle_lib):
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
self.assertEqual(unittest, loads(dumps(lambda: unittest))())

Expand All @@ -77,6 +87,7 @@ def test_lambda_with_main_globals(self, pickle_lib):
])
def test_lambda_with_closure(self, pickle_lib):
"""Tests that the closure of a function is preserved."""
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
self.assertEqual(
'closure: abc',
Expand All @@ -88,6 +99,7 @@ def test_lambda_with_closure(self, pickle_lib):
])
def test_class(self, pickle_lib):
"""Tests that a class object is pickled correctly."""
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.Xyz))().foo('abc def'))
Expand All @@ -98,6 +110,7 @@ def test_class(self, pickle_lib):
])
def test_object(self, pickle_lib):
"""Tests that a class instance is pickled correctly."""
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))
Expand All @@ -108,6 +121,7 @@ def test_object(self, pickle_lib):
])
def test_nested_class(self, pickle_lib):
"""Tests that a nested class object is pickled correctly."""
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
self.assertEqual(
'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
Expand All @@ -121,6 +135,7 @@ def test_nested_class(self, pickle_lib):
])
def test_dynamic_class(self, pickle_lib):
"""Tests that a nested class object is pickled correctly."""
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
self.assertEqual(
'Z:abc', loads(dumps(module_test.create_class('abc'))).get())
Expand All @@ -130,6 +145,7 @@ def test_dynamic_class(self, pickle_lib):
param(pickle_lib='cloudpickle'),
])
def test_generators(self, pickle_lib):
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
with self.assertRaises(TypeError):
dumps((_ for _ in range(10)))
Expand All @@ -139,6 +155,7 @@ def test_generators(self, pickle_lib):
param(pickle_lib='cloudpickle'),
])
def test_recursive_class(self, pickle_lib):
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
self.assertEqual(
'RecursiveClass:abc',
Expand All @@ -149,6 +166,7 @@ def test_recursive_class(self, pickle_lib):
param(pickle_lib='cloudpickle'),
])
def test_pickle_rlock(self, pickle_lib):
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
rlock_instance = threading.RLock()
rlock_type = type(rlock_instance)
Expand All @@ -160,6 +178,7 @@ def test_pickle_rlock(self, pickle_lib):
param(pickle_lib='cloudpickle'),
])
def test_save_paths(self, pickle_lib):
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
f = loads(dumps(lambda x: x))
co_filename = f.__code__.co_filename
Expand All @@ -171,6 +190,7 @@ def test_save_paths(self, pickle_lib):
param(pickle_lib='cloudpickle'),
])
def test_dump_and_load_mapping_proxy(self, pickle_lib):
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)
self.assertEqual(
'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc'])
Expand All @@ -184,6 +204,7 @@ def test_dump_and_load_mapping_proxy(self, pickle_lib):
param(pickle_lib='cloudpickle'),
])
def test_dataclass(self, pickle_lib):
maybe_skip_if_no_dill(pickle_lib)
exec(
'''
from apache_beam.internal.module_test import DataClass
Expand All @@ -195,6 +216,7 @@ def test_dataclass(self, pickle_lib):
param(pickle_lib='cloudpickle'),
])
def test_class_states_not_changed_at_subsequent_loading(self, pickle_lib):
maybe_skip_if_no_dill(pickle_lib)
pickler.set_library(pickle_lib)

class Local:
Expand Down Expand Up @@ -255,6 +277,7 @@ def maybe_get_sets_with_different_iteration_orders(self):
return set1, set2

def test_best_effort_determinism(self):
maybe_skip_if_no_dill('dill')
pickler.set_library('dill')
set1, set2 = self.maybe_get_sets_with_different_iteration_orders()
self.assertEqual(
Expand All @@ -267,6 +290,7 @@ def test_best_effort_determinism(self):
self.skipTest('Set iteration orders matched. Test results inconclusive.')

def test_disable_best_effort_determinism(self):
maybe_skip_if_no_dill('dill')
pickler.set_library('dill')
set1, set2 = self.maybe_get_sets_with_different_iteration_orders()
# The test relies on the sets having different iteration orders for the
Expand Down
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
except ImportError:
raise unittest.SkipTest('GCP dependencies are not installed')

try:
import dill
except ImportError:
dill = None

_LOGGER = logging.getLogger(__name__)

_DESTINATION_ELEMENT_PAIRS = [
Expand Down Expand Up @@ -406,6 +411,13 @@ def test_partition_files_dofn_size_split(self):
label='CheckSinglePartition')


def maybe_skip(compat_version):
if compat_version and not dill:
raise unittest.SkipTest(
'Dill dependency not installed which is required for compat_version'
' <= 2.67.0')


class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
def test_trigger_load_jobs_with_empty_files(self):
destination = "project:dataset.table"
Expand Down Expand Up @@ -485,7 +497,9 @@ def test_records_traverse_transform_with_mocks(self):
param(compat_version=None),
param(compat_version="2.64.0"),
])
@pytest.mark.uses_dill
def test_reshuffle_before_load(self, compat_version):
maybe_skip(compat_version)
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
Expand Down Expand Up @@ -994,6 +1008,7 @@ def dynamic_destination_resolver(element, *side_inputs):
])
def test_triggering_frequency(
self, is_streaming, with_auto_sharding, compat_version):
maybe_skip(compat_version)
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
Expand Down
9 changes: 8 additions & 1 deletion sdks/python/apache_beam/ml/anomaly/specifiable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import unittest
from typing import Optional

import pytest
from parameterized import parameterized

from apache_beam.internal.cloudpickle import cloudpickle
Expand Down Expand Up @@ -323,7 +324,10 @@ def __init__(self, arg):
self.my_arg = arg * 10
type(self).counter += 1

def test_on_pickle(self):
@pytest.mark.uses_dill
def test_on_dill_pickle(self):
pytest.importorskip("dill")

FooForPickle = TestInitCallCount.FooForPickle

import dill
Expand All @@ -339,6 +343,9 @@ def test_on_pickle(self):
self.assertEqual(FooForPickle.counter, 1)
self.assertEqual(new_foo_2.__dict__, foo.__dict__)

def test_on_pickle(self):
FooForPickle = TestInitCallCount.FooForPickle

# Note that pickle does not support classes/functions nested in a function.
import pickle
FooForPickle.counter = 0
Expand Down
Loading
Loading