From 1af87d7638f19559e824c15e94d20fee424b2c45 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 2 May 2025 23:05:17 +0000 Subject: [PATCH] Dont preserve paneinfo with default windows. --- sdks/python/apache_beam/transforms/util.py | 8 ++++---- .../apache_beam/transforms/util_test.py | 19 +++---------------- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 7c3a1929ba9d..ed3e83f8f2bc 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -952,7 +952,7 @@ def reify_metadata_default_window( key, value = element if timestamp == window.MIN_TIMESTAMP: timestamp = None - return key, (value, timestamp, pane_info) + return key, (value, timestamp) def restore_metadata_default_window(element): @@ -960,8 +960,8 @@ def restore_metadata_default_window(element): return [ window.GlobalWindows.windowed_value(None).with_value((key, value)) if timestamp is None else window.GlobalWindows.windowed_value( - value=(key, value), timestamp=timestamp, pane_info=pane_info) - for (value, timestamp, pane_info) in values + value=(key, value), timestamp=timestamp) + for (value, timestamp) in values ] @@ -989,7 +989,7 @@ def _reify_restore_metadata(is_default_windowing): def _add_pre_map_gkb_types(pre_gbk_map, is_default_windowing): if is_default_windowing: return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( - tuple[K, tuple[V, Optional[Timestamp], windowed_value.PaneInfo]]) + tuple[K, tuple[V, Optional[Timestamp]]]) return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( tuple[K, TypedWindowedValue[V]]) diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index c8304255238c..e7b77046f5d0 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1082,7 +1082,8 @@ def test_reshuffle_custom_window_preserves_metadata(self, compat_version): param(compat_version=None), param(compat_version="2.64.0"), ]) - def test_reshuffle_default_window_preserves_metadata(self, compat_version): + def test_reshuffle_default_window_doesnt_preserve_paneinfo( + self, compat_version): """Tests that Reshuffle preserves timestamp, window, and pane info metadata.""" @@ -1107,17 +1108,7 @@ def test_reshuffle_default_window_preserves_metadata(self, compat_version): index=1, nonspeculative_index=1) - expected_preserved = [ - TestWindowedValue('a', MIN_TIMESTAMP, [GlobalWindow()], no_firing), - TestWindowedValue( - 'b', timestamp.Timestamp(0), [GlobalWindow()], on_time_only), - TestWindowedValue( - 'c', timestamp.Timestamp(33), [GlobalWindow()], late_firing), - TestWindowedValue( - 'd', GlobalWindow().max_timestamp(), [GlobalWindow()], no_firing) - ] - - expected_not_preserved = [ + expected = [ TestWindowedValue( 'a', MIN_TIMESTAMP, [GlobalWindow()], PANE_INFO_UNKNOWN), TestWindowedValue( @@ -1130,10 +1121,6 @@ def test_reshuffle_default_window_preserves_metadata(self, compat_version): PANE_INFO_UNKNOWN) ] - expected = ( - expected_preserved - if compat_version is None else expected_not_preserved) - options = PipelineOptions(update_compatibility_version=compat_version) with TestPipeline(options=options) as pipeline: # Create windowed values with specific metadata