From bae3e8b9bd4d066cb9ceb254bcc924e6531939c3 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 8 Sep 2025 19:35:40 -0400 Subject: [PATCH 1/8] Make dill optional --- .../.run/Run IDE with Plugin.run.xml | 2 +- sdks/python/apache_beam/coders/coders.py | 10 +++-- .../apache_beam/coders/coders_test_common.py | 26 +++++++++++ sdks/python/apache_beam/internal/pickler.py | 12 ++++- .../apache_beam/internal/pickler_test.py | 24 ++++++++++ .../ml/anomaly/specifiable_test.py | 9 +++- .../apache_beam/options/pipeline_options.py | 3 +- .../options/pipeline_options_validator.py | 28 ++++++++++++ .../pipeline_options_validator_test.py | 44 +++++++++++++++++++ .../runners/portability/stager_test.py | 2 + .../transforms/combinefn_lifecycle_test.py | 5 +++ .../apache_beam/typehints/schemas_test.py | 11 +++-- .../base_image_requirements_manual.txt | 3 ++ sdks/python/pytest.ini | 1 + sdks/python/setup.py | 14 +++--- sdks/python/test-suites/tox/common.gradle | 3 ++ sdks/python/tox.ini | 8 ++++ 17 files changed, 189 insertions(+), 16 deletions(-) diff --git a/plugins/beam-code-completion-plugin/.run/Run IDE with Plugin.run.xml b/plugins/beam-code-completion-plugin/.run/Run IDE with Plugin.run.xml index 46f05949227e..6bbfd792f7cd 100644 --- a/plugins/beam-code-completion-plugin/.run/Run IDE with Plugin.run.xml +++ b/plugins/beam-code-completion-plugin/.run/Run IDE with Plugin.run.xml @@ -40,4 +40,4 @@ false - \ No newline at end of file + diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index e527185bd571..a9cd8ed626b6 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -85,9 +85,7 @@ # occurs. from apache_beam.internal.dill_pickler import dill except ImportError: - # We fall back to using the stock dill library in tests that don't use the - # full Python SDK. - import dill + dill = None # type: ignore __all__ = [ 'Coder', @@ -900,6 +898,12 @@ def to_type_hint(self): class DillCoder(_PickleCoderBase): """Coder using dill's pickle functionality.""" + def __init__(self): + if not dill: + raise RuntimeError("This pipeline contains a DillCoder which requires " \ + "the dill package. Install the dill package with the dill extra " \ + "e.g. apache-beam[dill]") + def _create_impl(self): return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 587e5d87522e..1ae9a32790ac 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -59,6 +59,11 @@ except ImportError: dataclasses = None # type: ignore +try: + import dill +except ImportError: + dill = None + MyNamedTuple = collections.namedtuple('A', ['x', 'y']) # type: ignore[name-match] AnotherNamedTuple = collections.namedtuple('AnotherNamedTuple', ['x', 'y']) MyTypedNamedTuple = NamedTuple('MyTypedNamedTuple', [('f1', int), ('f2', str)]) @@ -116,6 +121,7 @@ class UnFrozenDataClass: # These tests need to all be run in the same process due to the asserts # in tearDownClass. @pytest.mark.no_xdist +@pytest.mark.uses_dill class CodersTest(unittest.TestCase): # These class methods ensure that we test each defined coder in both @@ -173,6 +179,9 @@ def tearDownClass(cls): coders.BigIntegerCoder, # tested in DecimalCoder coders.TimestampPrefixingOpaqueWindowCoder, ]) + if not dill: + standard -= set( + [coders.DillCoder, coders.DeterministicFastPrimitivesCoder]) cls.seen_nested -= set( [coders.ProtoCoder, coders.ProtoPlusCoder, CustomCoder]) assert not standard - cls.seen, str(standard - cls.seen) @@ -241,8 +250,13 @@ def test_memoizing_pickle_coder(self): param(compat_version="2.67.0"), ]) def test_deterministic_coder(self, compat_version): + typecoders.registry.update_compatibility_version = compat_version coder = coders.FastPrimitivesCoder() + if not dill and compat_version: + with self.assertRaises(RuntimeError): + coder.as_deterministic_coder(step_label="step") + self.skipTest('Dill not installed') deterministic_coder = coder.as_deterministic_coder(step_label="step") self.check_coder(deterministic_coder, *self.test_values_deterministic) @@ -321,6 +335,11 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version): coder = coders.MapCoder( coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder()) + if not dill and compat_version: + with self.assertRaises(RuntimeError): + coder.as_deterministic_coder(step_label="step") + self.skipTest('Dill not installed') + deterministic_coder = coder.as_deterministic_coder(step_label="step") assert isinstance( @@ -331,6 +350,11 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version): self.check_coder(deterministic_coder, *values) def test_dill_coder(self): + if not dill: + with self.assertRaises(RuntimeError): + coders.DillCoder() + self.skipTest('Dill not installed') + cell_value = (lambda x: lambda: x)(0).__closure__[0] self.check_coder(coders.DillCoder(), 'a', 1, cell_value) self.check_coder( @@ -661,6 +685,8 @@ def test_param_windowed_value_coder(self): def test_cross_process_encoding_of_special_types_is_deterministic( self, compat_version): """Test cross-process determinism for all special deterministic types""" + if compat_version: + pytest.importorskip("dill") if sys.executable is None: self.skipTest('No Python interpreter found') diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 256f88c5453f..b4e60f4cb380 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -29,7 +29,11 @@ """ from apache_beam.internal import cloudpickle_pickler -from apache_beam.internal import dill_pickler + +try: + from apache_beam.internal import dill_pickler +except ImportError: + dill_pickler = None USE_CLOUDPICKLE = 'cloudpickle' USE_DILL = 'dill' @@ -74,6 +78,12 @@ def load_session(file_path): def set_library(selected_library=DEFAULT_PICKLE_LIB): """ Sets pickle library that will be used. """ global desired_pickle_lib + + if selected_library == USE_DILL and not dill_pickler: + if not dill_pickler: + raise ImportError( + "Dill is not installed. To use dill pickler, install apache-beam with" + " the dill extras package e.g. apache-beam[dill].") # If switching to or from dill, update the pickler hook overrides. if (selected_library == USE_DILL) != (desired_pickle_lib == dill_pickler): dill_pickler.override_pickler_hooks(selected_library == USE_DILL) diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 7048f680de87..84c70b325649 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -19,6 +19,7 @@ # pytype: skip-file +import pytest import random import sys import threading @@ -34,6 +35,12 @@ from apache_beam.internal.pickler import loads +def maybe_skip_if_no_dill(pickle_library): + if pickle_library == 'dill': + pytest.importorskip("dill") + + +@pytest.mark.uses_dill class PicklerTest(unittest.TestCase): NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") @@ -43,6 +50,7 @@ class PicklerTest(unittest.TestCase): param(pickle_lib='cloudpickle'), ]) def test_basics(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )]))) @@ -55,6 +63,7 @@ def test_basics(self, pickle_lib): ]) def test_lambda_with_globals(self, pickle_lib): """Tests that the globals of a function are preserved.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) # The point of the test is that the lambda being called after unpickling @@ -68,6 +77,7 @@ def test_lambda_with_globals(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_lambda_with_main_globals(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual(unittest, loads(dumps(lambda: unittest))()) @@ -77,6 +87,7 @@ def test_lambda_with_main_globals(self, pickle_lib): ]) def test_lambda_with_closure(self, pickle_lib): """Tests that the closure of a function is preserved.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'closure: abc', @@ -88,6 +99,7 @@ def test_lambda_with_closure(self, pickle_lib): ]) def test_class(self, pickle_lib): """Tests that a class object is pickled correctly.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual(['abc', 'def'], loads(dumps(module_test.Xyz))().foo('abc def')) @@ -98,6 +110,7 @@ def test_class(self, pickle_lib): ]) def test_object(self, pickle_lib): """Tests that a class instance is pickled correctly.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual(['abc', 'def'], loads(dumps(module_test.XYZ_OBJECT)).foo('abc def')) @@ -108,6 +121,7 @@ def test_object(self, pickle_lib): ]) def test_nested_class(self, pickle_lib): """Tests that a nested class object is pickled correctly.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum) @@ -121,6 +135,7 @@ def test_nested_class(self, pickle_lib): ]) def test_dynamic_class(self, pickle_lib): """Tests that a nested class object is pickled correctly.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'Z:abc', loads(dumps(module_test.create_class('abc'))).get()) @@ -130,6 +145,7 @@ def test_dynamic_class(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_generators(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) with self.assertRaises(TypeError): dumps((_ for _ in range(10))) @@ -139,6 +155,7 @@ def test_generators(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_recursive_class(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'RecursiveClass:abc', @@ -149,6 +166,7 @@ def test_recursive_class(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_pickle_rlock(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) rlock_instance = threading.RLock() rlock_type = type(rlock_instance) @@ -160,6 +178,7 @@ def test_pickle_rlock(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_save_paths(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) f = loads(dumps(lambda x: x)) co_filename = f.__code__.co_filename @@ -171,6 +190,7 @@ def test_save_paths(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_dump_and_load_mapping_proxy(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc']) @@ -184,6 +204,7 @@ def test_dump_and_load_mapping_proxy(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_dataclass(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) exec( ''' from apache_beam.internal.module_test import DataClass @@ -195,6 +216,7 @@ def test_dataclass(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_class_states_not_changed_at_subsequent_loading(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) class Local: @@ -255,6 +277,7 @@ def maybe_get_sets_with_different_iteration_orders(self): return set1, set2 def test_best_effort_determinism(self): + maybe_skip_if_no_dill('dill') pickler.set_library('dill') set1, set2 = self.maybe_get_sets_with_different_iteration_orders() self.assertEqual( @@ -267,6 +290,7 @@ def test_best_effort_determinism(self): self.skipTest('Set iteration orders matched. Test results inconclusive.') def test_disable_best_effort_determinism(self): + maybe_skip_if_no_dill('dill') pickler.set_library('dill') set1, set2 = self.maybe_get_sets_with_different_iteration_orders() # The test relies on the sets having different iteration orders for the diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py index ccd8efd286cb..0e50bf71f8fe 100644 --- a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py +++ b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py @@ -19,6 +19,7 @@ import dataclasses import logging import os +import pytest import unittest from typing import Optional @@ -323,7 +324,10 @@ def __init__(self, arg): self.my_arg = arg * 10 type(self).counter += 1 - def test_on_pickle(self): + @pytest.mark.uses_dill + def test_on_dill_pickle(self): + pytest.importorskip("dill") + FooForPickle = TestInitCallCount.FooForPickle import dill @@ -339,6 +343,9 @@ def test_on_pickle(self): self.assertEqual(FooForPickle.counter, 1) self.assertEqual(new_foo_2.__dict__, foo.__dict__) + def test_on_pickle(self): + FooForPickle = TestInitCallCount.FooForPickle + # Note that pickle does not support classes/functions nested in a function. import pickle FooForPickle.counter = 0 diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c30a902063e0..a5f3d81a1564 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1619,7 +1619,7 @@ def _add_argparse_args(cls, parser): help=( 'Chooses which pickle library to use. Options are dill, ' 'cloudpickle or default.'), - choices=['cloudpickle', 'default', 'dill']) + choices=['cloudpickle', 'default', 'dill', 'dill_unsafe']) parser.add_argument( '--save_main_session', default=False, @@ -1701,6 +1701,7 @@ def _add_argparse_args(cls, parser): def validate(self, validator): errors = [] errors.extend(validator.validate_container_prebuilding_options(self)) + errors.extend(validator.validate_pickle_library(self)) return errors diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py index ebe9c8f223ce..c103ea8bb916 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator.py @@ -119,6 +119,15 @@ class PipelineOptionsValidator(object): ERR_REPEATABLE_OPTIONS_NOT_SET_AS_LIST = ( '(%s) is a string. Programmatically set PipelineOptions like (%s) ' 'options need to be specified as a list.') + ERR_DILL_NOT_INSTALLED = ( + 'Option pickle_library=dill requires dill==0.3.1.1. Install apache-beam ' + 'with the dill extra e.g. apache-beam[gcp, dill]. Dill package was not ' + 'found') + ERR_UNSAFE_DILL_VERSION = ( + 'Dill version 0.3.1.1 is required when using pickle_library=dill. Other' + 'versions of dill are untested with Apache Beam. To install the supported' + 'dill version instal apache-beam[dill] extra. To use an unsupported dill ' + 'version, use pickle_library=dill_unsafe. %s') # GCS path specific patterns. GCS_URI = '(?P[^:]+)://(?P[^/]+)(/(?P.*))?' @@ -196,6 +205,25 @@ def validate_gcs_path(self, view, arg_name): return self._validate_error(self.ERR_INVALID_GCS_OBJECT, arg, arg_name) return [] + def validate_pickle_library(self, view): + """Validates a GCS path against gs://bucket/object URI format.""" + if view.pickle_library == 'default' or view.pickle_library == 'cloudpickle': + return [] + + if view.pickle_library == 'dill_unsafe': + return [] + + if view.pickle_library == 'dill': + try: + import dill + if dill.__version__ != "0.3.1.1": + return self._validate_error( + self.ERR_UNSAFE_DILL_VERSION, + f"Dill version found {dill.__version__}") + except ImportError: + return self._validate_error(self.ERR_DILL_NOT_INSTALLED) + return [] + def validate_cloud_options(self, view): """Validates job_name and project arguments.""" errors = [] diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py index 56f305a01b74..64397dbbc6b3 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py @@ -22,6 +22,8 @@ import logging import unittest +import pytest + from hamcrest import assert_that from hamcrest import contains_string from hamcrest import only_contains @@ -244,6 +246,48 @@ def test_is_service_runner(self, runner, options, expected): validator = PipelineOptionsValidator(PipelineOptions(options), runner) self.assertEqual(validator.is_service_runner(), expected) + def test_pickle_library_dill_not_installed_returns_error(self): + runner = MockRunners.OtherRunner() + # Remove default region for this test. + options = PipelineOptions(['--pickle_library=dill']) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertEqual(len(errors), 1, errors) + self.assertIn("Option pickle_library=dill requires dill", errors[0]) + + @pytest.mark.uses_dill + def test_pickle_library_dill_installed_returns_no_error(self): + pytest.importorskip("dill") + runner = MockRunners.OtherRunner() + # Remove default region for this test. + options = PipelineOptions(['--pickle_library=dill']) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertEqual(len(errors), 0, errors) + + @pytest.mark.uses_dill + def test_pickle_library_dill_installed_returns_wrong_version(self): + pytest.importorskip("dill") + with unittest.mock.patch('dill.__version__', '0.3.6'): + runner = MockRunners.OtherRunner() + # Remove default region for this test. + options = PipelineOptions(['--pickle_library=dill']) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertEqual(len(errors), 1, errors) + self.assertIn("Dill version 0.3.1.1 is required when using ", errors[0]) + + @pytest.mark.uses_dill + def test_pickle_library_dill_unsafe_no_error(self): + pytest.importorskip("dill") + with unittest.mock.patch('dill.__version__', '0.3.6'): + runner = MockRunners.OtherRunner() + # Remove default region for this test. + options = PipelineOptions(['--pickle_library=dill_unsafe']) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertEqual(len(errors), 0, errors) + def test_dataflow_job_file_and_template_location_mutually_exclusive(self): runner = MockRunners.OtherRunner() options = PipelineOptions( diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 60e247080665..22a41e592c2b 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -173,11 +173,13 @@ def test_no_main_session(self): # xdist adds unpicklable modules to the main session. @pytest.mark.no_xdist + @pytest.mark.uses_dill @unittest.skipIf( sys.platform == "win32" and sys.version_info < (3, 8), 'https://github.com/apache/beam/issues/20659: pytest on Windows pulls ' 'in a zipimporter, unpicklable before py3.8') def test_with_main_session(self): + pytest.importorskip("dill") staging_dir = self.make_temp_dir() options = PipelineOptions() diff --git a/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py b/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py index 647e08db7aaa..69172a55f246 100644 --- a/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py +++ b/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py @@ -59,7 +59,12 @@ def test_combining_value_state(self): {'runner': fn_api_runner.FnApiRunner, 'pickler': 'dill'}, {'runner': fn_api_runner.FnApiRunner, 'pickler': 'cloudpickle'}, ]) # yapf: disable +@pytest.mark.uses_dill class LocalCombineFnLifecycleTest(unittest.TestCase): + def setUp(self): + if self.pickler == 'dill': + pytest.importorskip("dill") + def tearDown(self): CallSequenceEnforcingCombineFn.instances.clear() diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 6cf37322147e..78e04ce4487e 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -21,6 +21,7 @@ import itertools import pickle +import pytest import unittest from typing import Any from typing import ByteString @@ -30,7 +31,6 @@ from typing import Optional from typing import Sequence -import dill import numpy as np from hypothesis import given from hypothesis import settings @@ -711,13 +711,18 @@ def test_named_fields_roundtrip(self, named_fields): 'pickler': pickle, }, { - 'pickler': dill, + 'pickler': 'dill', }, { 'pickler': cloudpickle, }, ]) +@pytest.mark.uses_dill class PickleTest(unittest.TestCase): + def setUp(self): + if PickleTest.pickler == 'dill': + self.pickler = pytest.importorskip("dill") + def test_generated_class_pickle_instance(self): schema = schema_pb2.Schema( id="some-uuid", @@ -733,7 +738,7 @@ def test_generated_class_pickle_instance(self): self.assertEqual(instance, self.pickler.loads(self.pickler.dumps(instance))) def test_generated_class_pickle(self): - if self.pickler in [pickle, dill]: + if self.pickler in [pickle, pytest.importorskip("dill")]: self.skipTest('https://github.com/apache/beam/issues/22714') schema = schema_pb2.Schema( diff --git a/sdks/python/container/base_image_requirements_manual.txt b/sdks/python/container/base_image_requirements_manual.txt index bef89e9fd31e..536f62c27f5d 100644 --- a/sdks/python/container/base_image_requirements_manual.txt +++ b/sdks/python/container/base_image_requirements_manual.txt @@ -40,3 +40,6 @@ google-crc32c scipy scikit-learn build>=1.0,<2 # tool to build sdist from setup.py in stager. +# Dill 0.3.1.1 is included as a base manual requirement so is avaiable to users +# with pickle_library=dill, but apache-beam does not have a hard dependency. +dill>=0.3.1.1,<0.3.2 diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index cb244025812d..3eee1a5c0e80 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -71,6 +71,7 @@ markers = uses_feast: tests that uses feast in some way gemini_postcommit: gemini postcommits that need additional deps. require_docker_in_docker: tests that require running Docker inside Docker (Docker-in-Docker), which is not supported on Beam’s self-hosted runners. Context: https://github.com/apache/beam/pull/35585 + uses_dill: tests that require dill pickle library. # Default timeout intended for unit tests. # If certain tests need a different value, please see the docs on how to diff --git a/sdks/python/setup.py b/sdks/python/setup.py index e7ffc0c9780c..c91466c5778f 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -360,12 +360,6 @@ def get_portability_package_data(): install_requires=[ 'crcmod>=1.7,<2.0', 'orjson>=3.9.7,<4', - # Dill doesn't have forwards-compatibility guarantees within minor - # version. Pickles created with a new version of dill may not unpickle - # using older version of dill. It is best to use the same version of - # dill on client and server, therefore list of allowed versions is - # very narrow. See: https://github.com/uqfoundation/dill/issues/341. - 'dill>=0.3.1.1,<0.3.2', 'fastavro>=0.23.6,<2', 'fasteners>=0.3,<1.0', # TODO(https://github.com/grpc/grpc/issues/37710): Unpin grpc @@ -411,6 +405,14 @@ def get_portability_package_data(): python_requires=python_requires, # BEAM-8840: Do NOT use tests_require or setup_requires. extras_require={ + 'dill': [ + # Dill doesn't have forwards-compatibility guarantees within minor + # version. Pickles created with a new version of dill may not unpickle + # using older version of dill. It is best to use the same version of + # dill on client and server, therefore list of allowed versions is + # very narrow. See: https://github.com/uqfoundation/dill/issues/341. + 'dill>=0.3.1.1,<0.3.2', + ], 'docs': [ 'jinja2>=3.0,<3.2', 'Sphinx>=7.0.0,<8.0', diff --git a/sdks/python/test-suites/tox/common.gradle b/sdks/python/test-suites/tox/common.gradle index 75a12cdcf4cb..ac5dc57d8a55 100644 --- a/sdks/python/test-suites/tox/common.gradle +++ b/sdks/python/test-suites/tox/common.gradle @@ -29,6 +29,9 @@ test.dependsOn "testPy${pythonVersionSuffix}Cloud" toxTask "testPy${pythonVersionSuffix}ML", "py${pythonVersionSuffix}-ml", "${posargs}" test.dependsOn "testPy${pythonVersionSuffix}ML" +toxTask "testPy${pythonVersionSuffix}Dill", "py${pythonVersionSuffix}-dill", "${posargs}" +test.dependsOn "testPy${pythonVersionSuffix}Dill" + // toxTask "testPy${pythonVersionSuffix}Dask", "py${pythonVersionSuffix}-dask", "${posargs}" // test.dependsOn "testPy${pythonVersionSuffix}Dask" project.tasks.register("preCommitPy${pythonVersionSuffix}") { diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index f344cfc61ccf..354f03a7dba6 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -571,3 +571,11 @@ commands = /bin/sh -c "pip freeze | grep -E tensorflow" # Allow exit code 5 (no tests run) so that we can run this command safely on arbitrary subdirectories. bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms/embeddings' + +[testenv:py{310,312}-dill] +extras = test,dill +commands = + # Log dill version for debugging + /bin/sh -c "pip freeze | grep -E dill" + # Run all dill-specific tests + /bin/sh -c 'pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 1 -m uses_dill {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' From 5a2f77ec1c5b31ac2171a4cc6c23be6236fd5b85 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Sep 2025 10:05:54 -0400 Subject: [PATCH 2/8] Fix some tests that use update compat flag. --- sdks/python/apache_beam/internal/pickler.py | 2 +- .../io/gcp/bigquery_file_loads_test.py | 15 +++++++++++++++ sdks/python/apache_beam/pipeline_test.py | 2 ++ sdks/python/apache_beam/transforms/util_test.py | 17 ++++++++++++++++- .../apache_beam/typehints/schemas_test.py | 3 ++- sdks/python/setup.py | 9 +++++---- 6 files changed, 41 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index b4e60f4cb380..890e80a39309 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -33,7 +33,7 @@ try: from apache_beam.internal import dill_pickler except ImportError: - dill_pickler = None + dill_pickler = None # type: ignore[assignment] USE_CLOUDPICKLE = 'cloudpickle' USE_DILL = 'dill' diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 5005290ad9e8..c318b1988536 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -63,6 +63,11 @@ except ImportError: raise unittest.SkipTest('GCP dependencies are not installed') +try: + import dill +except ImportError: + dill = None + _LOGGER = logging.getLogger(__name__) _DESTINATION_ELEMENT_PAIRS = [ @@ -406,6 +411,13 @@ def test_partition_files_dofn_size_split(self): label='CheckSinglePartition') +def maybe_skip(compat_version): + if compat_version and not dill: + raise unittest.SkipTest( + 'Dill dependency not installed which is required for compat_version' + ' <= 2.67.0') + + class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp): def test_trigger_load_jobs_with_empty_files(self): destination = "project:dataset.table" @@ -485,7 +497,9 @@ def test_records_traverse_transform_with_mocks(self): param(compat_version=None), param(compat_version="2.64.0"), ]) + @pytest.mark.uses_dill def test_reshuffle_before_load(self, compat_version): + maybe_skip(compat_version) destination = 'project1:dataset1.table1' job_reference = bigquery_api.JobReference() @@ -994,6 +1008,7 @@ def dynamic_destination_resolver(element, *side_inputs): ]) def test_triggering_frequency( self, is_streaming, with_auto_sharding, compat_version): + maybe_skip(compat_version) destination = 'project1:dataset1.table1' job_reference = bigquery_api.JobReference() diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index dc0d9a7cc58f..6e439aff5848 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -177,7 +177,9 @@ def expand(self, pcoll): _ = pipeline | ParentTransform() | beam.Map(lambda x: x + 1) @mock.patch('logging.info') + @pytest.mark.uses_dill def test_runner_overrides_default_pickler(self, mock_info): + pytest.importorskip("dill") with mock.patch.object(PipelineRunner, 'default_pickle_library_override') as mock_fn: mock_fn.return_value = 'dill' diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index b365d9b22090..66e7a9e194d3 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -83,6 +83,11 @@ from apache_beam.utils.windowed_value import PaneInfoTiming from apache_beam.utils.windowed_value import WindowedValue +try: + import dill +except ImportError: + dill = None + warnings.filterwarnings( 'ignore', category=FutureWarning, module='apache_beam.transform.util_test') @@ -112,6 +117,13 @@ def is_deterministic(self): return True +def maybe_skip(compat_version): + if compat_version and not dill: + raise unittest.SkipTest( + 'Dill dependency not installed which is required for compat_version' + ' <= 2.67.0') + + class CoGroupByKeyTest(unittest.TestCase): def test_co_group_by_key_on_tuple(self): with TestPipeline() as pipeline: @@ -997,8 +1009,10 @@ def test_reshuffle_streaming_global_window_with_buckets(self): param(compat_version=None), param(compat_version="2.64.0"), ]) + @pytest.mark.uses_dill def test_reshuffle_custom_window_preserves_metadata(self, compat_version): """Tests that Reshuffle preserves pane info.""" + maybe_skip(compat_version) element_count = 12 timestamp_value = timestamp.Timestamp(0) l = [ @@ -1098,10 +1112,11 @@ def test_reshuffle_custom_window_preserves_metadata(self, compat_version): param(compat_version=None), param(compat_version="2.64.0"), ]) + @pytest.mark.uses_dill def test_reshuffle_default_window_preserves_metadata(self, compat_version): """Tests that Reshuffle preserves timestamp, window, and pane info metadata.""" - + maybe_skip(compat_version) no_firing = PaneInfo( is_first=True, is_last=True, diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 78e04ce4487e..c24d31403b43 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -720,7 +720,8 @@ def test_named_fields_roundtrip(self, named_fields): @pytest.mark.uses_dill class PickleTest(unittest.TestCase): def setUp(self): - if PickleTest.pickler == 'dill': + # pylint: disable=access-member-before-definition + if self.pickler == 'dill': self.pickler = pytest.importorskip("dill") def test_generated_class_pickle_instance(self): diff --git a/sdks/python/setup.py b/sdks/python/setup.py index c91466c5778f..81f56f3cfafe 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -407,10 +407,11 @@ def get_portability_package_data(): extras_require={ 'dill': [ # Dill doesn't have forwards-compatibility guarantees within minor - # version. Pickles created with a new version of dill may not unpickle - # using older version of dill. It is best to use the same version of - # dill on client and server, therefore list of allowed versions is - # very narrow. See: https://github.com/uqfoundation/dill/issues/341. + # version. Pickles created with a new version of dill may not + # unpickle using older version of dill. It is best to use the same + # version of dill on client and server, therefore list of allowed + # versions is very narrow. + # See: https://github.com/uqfoundation/dill/issues/341. 'dill>=0.3.1.1,<0.3.2', ], 'docs': [ From 8b652b69df1e6bf8fef1f3264754ad8892b57023 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Sep 2025 18:23:21 +0000 Subject: [PATCH 3/8] Lint and revert unrelated change. --- .../.run/Run IDE with Plugin.run.xml | 2 +- sdks/python/apache_beam/coders/coders.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/beam-code-completion-plugin/.run/Run IDE with Plugin.run.xml b/plugins/beam-code-completion-plugin/.run/Run IDE with Plugin.run.xml index 6bbfd792f7cd..46f05949227e 100644 --- a/plugins/beam-code-completion-plugin/.run/Run IDE with Plugin.run.xml +++ b/plugins/beam-code-completion-plugin/.run/Run IDE with Plugin.run.xml @@ -40,4 +40,4 @@ false - + \ No newline at end of file diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index a9cd8ed626b6..0d94be119959 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -85,7 +85,7 @@ # occurs. from apache_beam.internal.dill_pickler import dill except ImportError: - dill = None # type: ignore + dill = None __all__ = [ 'Coder', From 0967a62095b1777336ec0a2a5fa17c64eda2d2f2 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Sep 2025 23:45:00 +0000 Subject: [PATCH 4/8] Fix lint. --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 81f56f3cfafe..4d7ba0d5a506 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -410,7 +410,7 @@ def get_portability_package_data(): # version. Pickles created with a new version of dill may not # unpickle using older version of dill. It is best to use the same # version of dill on client and server, therefore list of allowed - # versions is very narrow. + # versions is very narrow. # See: https://github.com/uqfoundation/dill/issues/341. 'dill>=0.3.1.1,<0.3.2', ], From e45a47425e30ded010f3d8f1219bc86eac52e671 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Sep 2025 17:00:12 +0000 Subject: [PATCH 5/8] Lint fixes. --- sdks/python/apache_beam/internal/pickler_test.py | 2 +- sdks/python/apache_beam/ml/anomaly/specifiable_test.py | 2 +- .../apache_beam/options/pipeline_options_validator_test.py | 1 - sdks/python/apache_beam/typehints/schemas_test.py | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 84c70b325649..a0135b221e8c 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -19,13 +19,13 @@ # pytype: skip-file -import pytest import random import sys import threading import types import unittest +import pytest from parameterized import param from parameterized import parameterized diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py index 0e50bf71f8fe..a222cf57973e 100644 --- a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py +++ b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py @@ -19,10 +19,10 @@ import dataclasses import logging import os -import pytest import unittest from typing import Optional +import pytest from parameterized import parameterized from apache_beam.internal.cloudpickle import cloudpickle diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py index 64397dbbc6b3..8206d45dcf03 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py @@ -23,7 +23,6 @@ import unittest import pytest - from hamcrest import assert_that from hamcrest import contains_string from hamcrest import only_contains diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index c24d31403b43..73db06b9a8d2 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -21,7 +21,6 @@ import itertools import pickle -import pytest import unittest from typing import Any from typing import ByteString @@ -32,6 +31,7 @@ from typing import Sequence import numpy as np +import pytest from hypothesis import given from hypothesis import settings from parameterized import parameterized From ea79bdaec0b6d9df7a21f3cff18262ce416e922f Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Sep 2025 18:20:33 +0000 Subject: [PATCH 6/8] Fix messages and actually use dill when dill_unsafe flag is used. --- sdks/python/apache_beam/coders/coders.py | 7 +++--- sdks/python/apache_beam/internal/pickler.py | 22 ++++++++++++++----- .../options/pipeline_options_validator.py | 6 ++--- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 0d94be119959..fe5728c0f16e 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -900,9 +900,10 @@ class DillCoder(_PickleCoderBase): """Coder using dill's pickle functionality.""" def __init__(self): if not dill: - raise RuntimeError("This pipeline contains a DillCoder which requires " \ - "the dill package. Install the dill package with the dill extra " \ - "e.g. apache-beam[dill]") + raise RuntimeError( + "This pipeline contains a DillCoder which requires " + "the dill package. Install the dill package with the dill extra " + "e.g. apache-beam[dill]") def _create_impl(self): return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 890e80a39309..e7b404fdc47c 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -37,6 +37,7 @@ USE_CLOUDPICKLE = 'cloudpickle' USE_DILL = 'dill' +USE_DILL_UNSAFE = 'dill_unsafe' DEFAULT_PICKLE_LIB = USE_CLOUDPICKLE desired_pickle_lib = cloudpickle_pickler @@ -80,18 +81,27 @@ def set_library(selected_library=DEFAULT_PICKLE_LIB): global desired_pickle_lib if selected_library == USE_DILL and not dill_pickler: - if not dill_pickler: - raise ImportError( - "Dill is not installed. To use dill pickler, install apache-beam with" - " the dill extras package e.g. apache-beam[dill].") + raise ImportError( + "Pipeline option pickle_library=dill is set, but dill is not " + "installed. Install apache-beam with the dill extras package " + "e.g. apache-beam[dill].") + if selected_library == USE_DILL_UNSAFE and not dill_pickler: + raise ImportError( + "Pipeline option pickle_library=dill_unsafe is set, but dill is not " + "installed. Install dill in job submission and runtime environments.") + + is_currently_dill = (desired_pickle_lib == dill_pickler) + dill_is_requested = ( + selected_library == USE_DILL or selected_library == USE_DILL_UNSAFE) + # If switching to or from dill, update the pickler hook overrides. - if (selected_library == USE_DILL) != (desired_pickle_lib == dill_pickler): + if is_currently_dill != dill_is_requested: dill_pickler.override_pickler_hooks(selected_library == USE_DILL) if selected_library == 'default': selected_library = DEFAULT_PICKLE_LIB - if selected_library == USE_DILL: + if dill_is_requested: desired_pickle_lib = dill_pickler elif selected_library == USE_CLOUDPICKLE: desired_pickle_lib = cloudpickle_pickler diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py index c103ea8bb916..e6d46600f5ae 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator.py @@ -124,10 +124,10 @@ class PipelineOptionsValidator(object): 'with the dill extra e.g. apache-beam[gcp, dill]. Dill package was not ' 'found') ERR_UNSAFE_DILL_VERSION = ( - 'Dill version 0.3.1.1 is required when using pickle_library=dill. Other' + 'Dill version 0.3.1.1 is required when using pickle_library=dill. Other ' 'versions of dill are untested with Apache Beam. To install the supported' - 'dill version instal apache-beam[dill] extra. To use an unsupported dill ' - 'version, use pickle_library=dill_unsafe. %s') + ' dill version instal apache-beam[dill] extra. To use an unsupported ' + 'dill version, use pickle_library=dill_unsafe. %s') # GCS path specific patterns. GCS_URI = '(?P[^:]+)://(?P[^/]+)(/(?P.*))?' From 59870bf4bd840096d3797da0c68efa519c7d9a22 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Sep 2025 18:22:59 +0000 Subject: [PATCH 7/8] Trigger postcommit. --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 00e0c3c25433..8675e9535061 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 27 + "modification": 28 } From 586f21cb9cfdb487463f8b52bd103a0143b7d12a Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Sep 2025 18:25:00 +0000 Subject: [PATCH 8/8] Fix docstring --- sdks/python/apache_beam/options/pipeline_options_validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py index e6d46600f5ae..0217363bc9b8 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator.py @@ -206,7 +206,7 @@ def validate_gcs_path(self, view, arg_name): return [] def validate_pickle_library(self, view): - """Validates a GCS path against gs://bucket/object URI format.""" + """Validates the pickle_library option.""" if view.pickle_library == 'default' or view.pickle_library == 'cloudpickle': return []