Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,16 +952,16 @@ def reify_metadata_default_window(
key, value = element
if timestamp == window.MIN_TIMESTAMP:
timestamp = None
return key, (value, timestamp, pane_info)
return key, (value, timestamp)


def restore_metadata_default_window(element):
key, values = element
return [
window.GlobalWindows.windowed_value(None).with_value((key, value))
if timestamp is None else window.GlobalWindows.windowed_value(
value=(key, value), timestamp=timestamp, pane_info=pane_info)
for (value, timestamp, pane_info) in values
value=(key, value), timestamp=timestamp)
for (value, timestamp) in values
]


Expand Down Expand Up @@ -989,7 +989,7 @@ def _reify_restore_metadata(is_default_windowing):
def _add_pre_map_gkb_types(pre_gbk_map, is_default_windowing):
if is_default_windowing:
return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types(
tuple[K, tuple[V, Optional[Timestamp], windowed_value.PaneInfo]])
tuple[K, tuple[V, Optional[Timestamp]]])
return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types(
tuple[K, TypedWindowedValue[V]])

Expand Down
19 changes: 3 additions & 16 deletions sdks/python/apache_beam/transforms/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,8 @@ def test_reshuffle_custom_window_preserves_metadata(self, compat_version):
param(compat_version=None),
param(compat_version="2.64.0"),
])
def test_reshuffle_default_window_preserves_metadata(self, compat_version):
def test_reshuffle_default_window_doesnt_preserve_paneinfo(
self, compat_version):
"""Tests that Reshuffle preserves timestamp, window, and pane info
metadata."""

Expand All @@ -1107,17 +1108,7 @@ def test_reshuffle_default_window_preserves_metadata(self, compat_version):
index=1,
nonspeculative_index=1)

expected_preserved = [
TestWindowedValue('a', MIN_TIMESTAMP, [GlobalWindow()], no_firing),
TestWindowedValue(
'b', timestamp.Timestamp(0), [GlobalWindow()], on_time_only),
TestWindowedValue(
'c', timestamp.Timestamp(33), [GlobalWindow()], late_firing),
TestWindowedValue(
'd', GlobalWindow().max_timestamp(), [GlobalWindow()], no_firing)
]

expected_not_preserved = [
expected = [
TestWindowedValue(
'a', MIN_TIMESTAMP, [GlobalWindow()], PANE_INFO_UNKNOWN),
TestWindowedValue(
Expand All @@ -1130,10 +1121,6 @@ def test_reshuffle_default_window_preserves_metadata(self, compat_version):
PANE_INFO_UNKNOWN)
]

expected = (
expected_preserved
if compat_version is None else expected_not_preserved)

options = PipelineOptions(update_compatibility_version=compat_version)
with TestPipeline(options=options) as pipeline:
# Create windowed values with specific metadata
Expand Down
Loading