From e77029f900acfeae56b93a9d4d33297e3fd04723 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 9 Dec 2024 23:15:12 -0500 Subject: [PATCH 1/5] Fix typehint in ReshufflePerKey on global window setting. --- sdks/python/apache_beam/transforms/util.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index a03652de2496..903e26fa202a 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -75,6 +75,7 @@ from apache_beam.typehints.decorators import get_signature from apache_beam.typehints.sharded_key_type import ShardedKeyType from apache_beam.utils import shared +from apache_beam.utils.timestamp import Timestamp from apache_beam.utils import windowed_value from apache_beam.utils.annotations import deprecated from apache_beam.utils.sharded_key import ShardedKey @@ -966,7 +967,7 @@ def restore_timestamps(element): key, windowed_values = element return [wv.with_value((key, wv.value)) for wv in windowed_values] - ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any) + ungrouped = pcoll | Map(reify_timestamps).with_input_types(Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner @@ -1005,7 +1006,6 @@ def __init__(self, num_buckets=None): generated. """ self.num_buckets = num_buckets if num_buckets else self._DEFAULT_NUM_BUCKETS - valid_buckets = isinstance(num_buckets, int) and num_buckets > 0 if not (num_buckets is None or valid_buckets): raise ValueError( @@ -1015,12 +1015,12 @@ def __init__(self, num_buckets=None): def expand(self, pcoll): # type: (pvalue.PValue) -> pvalue.PCollection return ( - pcoll | 'AddRandomKeys' >> - Map(lambda t: (random.randrange(0, self.num_buckets), t) - ).with_input_types(T).with_output_types(Tuple[int, T]) - | ReshufflePerKey() - | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( - Tuple[int, T]).with_output_types(T)) + pcoll | 'AddRandomKeys' >> + Map(lambda t: (random.randrange(0, self.num_buckets), t) + ).with_input_types(T).with_output_types(Tuple[int, T]) + | ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types(Tuple[int, T]) + | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( + Tuple[int, T]).with_output_types(T)) def to_runner_api_parameter(self, unused_context): # type: (PipelineContext) -> Tuple[str, None] From 6d60c65b9a4587356b2a4fbcc989616f18aaf150 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 16:42:25 -0500 Subject: [PATCH 2/5] Only update the type hint on global window setting. Need more work in non-global windows. --- sdks/python/apache_beam/transforms/util.py | 6 ++- .../apache_beam/transforms/util_test.py | 41 +++++++++++++++++++ sdks/python/setup.py | 3 +- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 903e26fa202a..f0db6b483d18 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -954,6 +954,8 @@ def restore_timestamps(element): window.GlobalWindows.windowed_value((key, value), timestamp) for (value, timestamp) in values ] + + ungrouped = pcoll | Map(reify_timestamps).with_input_types(Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) else: # typing: All conditional function variants must have identical signatures @@ -967,7 +969,9 @@ def restore_timestamps(element): key, windowed_values = element return [wv.with_value((key, wv.value)) for wv in windowed_values] - ungrouped = pcoll | Map(reify_timestamps).with_input_types(Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) + # TODO(https://github.com/apache/beam/issues/33356): Support reshuffling + # unpicklable objects with a non-global window setting. + ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index d86509c7dde3..dd1a960b2d84 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1011,6 +1011,47 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam): label="formatted_after_reshuffle") + def test_reshuffle_unpicklable_in_global_window(self): + global _Unpicklable + + class _Unpicklable(object): + def __init__(self, value): + self.value = value + + def __getstate__(self): + raise NotImplementedError() + + def __setstate__(self, state): + raise NotImplementedError() + + class _UnpicklableCoder(beam.coders.Coder): + + def encode(self, value): + return str(value.value).encode() + + def decode(self, encoded): + return _Unpicklable(int(encoded.decode())) + + def to_type_hint(self): + return _Unpicklable + + def is_deterministic(self): + return True + + beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder) + + with TestPipeline() as pipeline: + data = [_Unpicklable(i) for i in range(5)] + expected_data = [0, 10, 20, 30, 40] + result = ( + pipeline + | beam.Create(data) + | beam.WindowInto(GlobalWindows()) + | beam.Reshuffle() + | beam.Map(lambda u: u.value * 10)) + assert_that(result, equal_to(expected_data)) + + class WithKeysTest(unittest.TestCase): def setUp(self): self.l = [1, 2, 3] diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 53c7a532e706..2bac10b3e055 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -336,7 +336,8 @@ def get_portability_package_data(): *get_portability_package_data() ] }, - ext_modules=extensions, + #ext_modules=extensions, + ext_modules=[], install_requires=[ 'crcmod>=1.7,<2.0', 'orjson>=3.9.7,<4', From 9a66c9672047377a1fe9b3740ab94c06ef6ee1fd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 16:44:22 -0500 Subject: [PATCH 3/5] Apply yapf --- sdks/python/apache_beam/transforms/util.py | 16 +++++++++------- sdks/python/apache_beam/transforms/util_test.py | 2 -- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index f0db6b483d18..a160f5a24a3c 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -955,7 +955,8 @@ def restore_timestamps(element): for (value, timestamp) in values ] - ungrouped = pcoll | Map(reify_timestamps).with_input_types(Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) + ungrouped = pcoll | Map(reify_timestamps).with_input_types( + Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) else: # typing: All conditional function variants must have identical signatures @@ -1019,12 +1020,13 @@ def __init__(self, num_buckets=None): def expand(self, pcoll): # type: (pvalue.PValue) -> pvalue.PCollection return ( - pcoll | 'AddRandomKeys' >> - Map(lambda t: (random.randrange(0, self.num_buckets), t) - ).with_input_types(T).with_output_types(Tuple[int, T]) - | ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types(Tuple[int, T]) - | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( - Tuple[int, T]).with_output_types(T)) + pcoll | 'AddRandomKeys' >> + Map(lambda t: (random.randrange(0, self.num_buckets), t) + ).with_input_types(T).with_output_types(Tuple[int, T]) + | ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types( + Tuple[int, T]) + | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( + Tuple[int, T]).with_output_types(T)) def to_runner_api_parameter(self, unused_context): # type: (PipelineContext) -> Tuple[str, None] diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index dd1a960b2d84..7f166f78ef0a 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1010,7 +1010,6 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam): equal_to(expected_data), label="formatted_after_reshuffle") - def test_reshuffle_unpicklable_in_global_window(self): global _Unpicklable @@ -1025,7 +1024,6 @@ def __setstate__(self, state): raise NotImplementedError() class _UnpicklableCoder(beam.coders.Coder): - def encode(self, value): return str(value.value).encode() From 96fe67491fc126c4d93376ce6fdc639ad946382d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 19:46:13 -0500 Subject: [PATCH 4/5] Fix some failed tests. --- sdks/python/apache_beam/transforms/util.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index a160f5a24a3c..9ea7f6e1dbdc 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -33,6 +33,7 @@ from typing import Callable from typing import Iterable from typing import List +from typing import Optional from typing import Tuple from typing import TypeVar from typing import Union @@ -75,10 +76,10 @@ from apache_beam.typehints.decorators import get_signature from apache_beam.typehints.sharded_key_type import ShardedKeyType from apache_beam.utils import shared -from apache_beam.utils.timestamp import Timestamp from apache_beam.utils import windowed_value from apache_beam.utils.annotations import deprecated from apache_beam.utils.sharded_key import ShardedKey +from apache_beam.utils.timestamp import Timestamp if TYPE_CHECKING: from apache_beam.runners.pipeline_context import PipelineContext @@ -956,7 +957,8 @@ def restore_timestamps(element): ] ungrouped = pcoll | Map(reify_timestamps).with_input_types( - Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) + Tuple[K, V]).with_output_types( + Tuple[K, Tuple[V, Optional[Timestamp]]]) else: # typing: All conditional function variants must have identical signatures From 1be1ef10ae68e127a4dcde774c825cf9a4ca5935 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 20:53:00 -0500 Subject: [PATCH 5/5] Revert change to setup.py --- sdks/python/apache_beam/transforms/util.py | 1 + sdks/python/setup.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 9ea7f6e1dbdc..43d4a6c20e94 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -1013,6 +1013,7 @@ def __init__(self, num_buckets=None): generated. """ self.num_buckets = num_buckets if num_buckets else self._DEFAULT_NUM_BUCKETS + valid_buckets = isinstance(num_buckets, int) and num_buckets > 0 if not (num_buckets is None or valid_buckets): raise ValueError( diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 2bac10b3e055..53c7a532e706 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -336,8 +336,7 @@ def get_portability_package_data(): *get_portability_package_data() ] }, - #ext_modules=extensions, - ext_modules=[], + ext_modules=extensions, install_requires=[ 'crcmod>=1.7,<2.0', 'orjson>=3.9.7,<4',