From 146518ad655ae057b229827de03bd02dee7220b8 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 16 Mar 2025 16:23:33 -0400 Subject: [PATCH 01/12] Preserve pane index through reshuffle. --- sdks/python/apache_beam/testing/util.py | 20 +- sdks/python/apache_beam/transforms/util.py | 35 ++- .../apache_beam/transforms/util_test.py | 224 +++++++++++++++++- 3 files changed, 265 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py index 5f2d211d2b72..cbb2119b83f6 100644 --- a/sdks/python/apache_beam/testing/util.py +++ b/sdks/python/apache_beam/testing/util.py @@ -23,7 +23,10 @@ import glob import io import tempfile +from typing import Any from typing import Iterable +from typing import List +from typing import NamedTuple from apache_beam import pvalue from apache_beam.transforms import window @@ -35,6 +38,8 @@ from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.ptransform import ptransform_fn from apache_beam.transforms.util import CoGroupByKey +from apache_beam.utils.windowed_value import PANE_INFO_UNKNOWN +from apache_beam.utils.windowed_value import PaneInfo __all__ = [ 'assert_that', @@ -56,8 +61,11 @@ class BeamAssertException(Exception): # Used for reifying timestamps and windows for assert_that matchers. -TestWindowedValue = collections.namedtuple( - 'TestWindowedValue', 'value timestamp windows') +class TestWindowedValue(NamedTuple): + value: Any + timestamp: Any + windows: List + pane_info: PaneInfo = PANE_INFO_UNKNOWN def contains_in_any_order(iterable): @@ -290,11 +298,15 @@ def assert_that( class ReifyTimestampWindow(DoFn): def process( - self, element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam): + self, + element, + timestamp=DoFn.TimestampParam, + window=DoFn.WindowParam, + pane_info=DoFn.PaneInfoParam): # This returns TestWindowedValue instead of # beam.utils.windowed_value.WindowedValue because ParDo will extract # the timestamp and window out of the latter. - return [TestWindowedValue(element, timestamp, [window])] + return [TestWindowedValue(element, timestamp, [window], pane_info)] class AddWindow(DoFn): def process(self, element, window=DoFn.WindowParam): diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 812c95c36519..6e2ec1dbe464 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -107,6 +107,7 @@ T = TypeVar('T') RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION = "2.64.0" +RESHUFFLE_PANE_INDEX_BREAKING_CHANGE_VERSION = "2.64.0" class CoGroupByKey(PTransform): @@ -959,23 +960,36 @@ class ReshufflePerKey(PTransform): """ def expand(self, pcoll): windowing_saved = pcoll.windowing + should_keep_paneinfo = not is_compat_version_prior_to( + pcoll.pipeline.options, RESHUFFLE_PANE_INDEX_BREAKING_CHANGE_VERSION) if windowing_saved.is_default(): # In this (common) case we can use a trivial trigger driver # and avoid the (expensive) window param. globally_windowed = window.GlobalWindows.windowed_value(None) MIN_TIMESTAMP = window.MIN_TIMESTAMP - def reify_timestamps(element, timestamp=DoFn.TimestampParam): + def reify_timestamps( + element, timestamp=DoFn.TimestampParam, pane_info=DoFn.PaneInfoParam): key, value = element if timestamp == MIN_TIMESTAMP: timestamp = None + if should_keep_paneinfo: + return key, (value, timestamp, pane_info) return key, (value, timestamp) def restore_timestamps(element): key, values = element + if should_keep_paneinfo: + return [ + globally_windowed.with_value((key, value)) + if timestamp is None else window.GlobalWindows.windowed_value( + value=(key, value), timestamp=timestamp, pane_info=pane_info) + for (value, timestamp, pane_info) in values + ] return [ - globally_windowed.with_value((key, value)) if timestamp is None else - window.GlobalWindows.windowed_value((key, value), timestamp) + globally_windowed.with_value((key, value)) + if timestamp is None else window.GlobalWindows.windowed_value( + value=(key, value), timestamp=timestamp) for (value, timestamp) in values ] @@ -985,14 +999,19 @@ def restore_timestamps(element): else: pre_gbk_map = Map(reify_timestamps).with_input_types( tuple[K, V]).with_output_types( - tuple[K, tuple[V, Optional[Timestamp]]]) + tuple[K, tuple[V, Optional[Timestamp], + windowed_value.PaneInfo]]) else: - # typing: All conditional function variants must have identical signatures - def reify_timestamps( # type: ignore[misc] - element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam): + def reify_timestamps( + element, + timestamp=DoFn.TimestampParam, + window=DoFn.WindowParam, + pane_info=DoFn.PaneInfoParam): key, value = element - # Transport the window as part of the value and restore it later. + if should_keep_paneinfo: + return key, windowed_value.WindowedValue( + value, timestamp, [window], pane_info) return key, windowed_value.WindowedValue(value, timestamp, [window]) def restore_timestamps(element): diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 2443a049ddba..11bd297bd39c 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -18,6 +18,7 @@ """Unit tests for the transform.util classes.""" # pytype: skip-file +# pylint: disable=too-many-function-args import collections import importlib @@ -33,6 +34,8 @@ import pytest import pytz +from parameterized import param +from parameterized import parameterized import apache_beam as beam from apache_beam import GroupByKey @@ -75,6 +78,9 @@ from apache_beam.utils import timestamp from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP +from apache_beam.utils.windowed_value import PANE_INFO_UNKNOWN +from apache_beam.utils.windowed_value import PaneInfo +from apache_beam.utils.windowed_value import PaneInfoTiming from apache_beam.utils.windowed_value import WindowedValue warnings.filterwarnings( @@ -793,7 +799,10 @@ def test_reshuffle_windows_unchanged(self): with TestPipeline() as pipeline: data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)] expected_data = [ - TestWindowedValue(v, t - .001, [w]) + TestWindowedValue( + v, + t - .001, [w], + pane_info=PaneInfo(True, False, PaneInfoTiming.ON_TIME, 0, 0)) for (v, t, w) in [((1, contains_in_any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)), @@ -826,6 +835,7 @@ def test_reshuffle_window_fn_preserved(self): any_order = contains_in_any_order with TestPipeline() as pipeline: data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)] + expected_windows = [ TestWindowedValue(v, t, [w]) for (v, t, w) in [((1, 1), 1.0, IntervalWindow(1.0, 3.0)), ( @@ -838,7 +848,10 @@ def test_reshuffle_window_fn_preserved(self): IntervalWindow(4.0, 6.0))] ] expected_merged_windows = [ - TestWindowedValue(v, t - .001, [w]) + TestWindowedValue( + v, + t - .001, [w], + pane_info=PaneInfo(True, False, PaneInfoTiming.ON_TIME, 0, 0)) for (v, t, w) in [((1, any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)), ( (2, any_order([2, 1])), 4.0, IntervalWindow(1.0, 4.0)), ( @@ -942,6 +955,213 @@ def test_reshuffle_streaming_global_window_with_buckets(self): assert_that( after_reshuffle, equal_to(expected_data), label='after reshuffle') + @parameterized.expand([ + param(compat_version=None), + param(compat_version="2.63.0"), + ]) + def test_reshuffle_custom_window_preserves_metadata(self, compat_version): + """Tests that Reshuffle preserves pane info.""" + element_count = 12 + timestamp_value = timestamp.Timestamp(0) + l = [ + TimestampedValue(("key", i), timestamp_value) + for i in range(element_count) + ] + + expected_timestamp = GlobalWindow().max_timestamp() + expected = [ + TestWindowedValue( + ('key', [0, 1, 2]), + expected_timestamp, + [GlobalWindow()], + pane_info=PaneInfo( + is_first=True, + is_last=False, + timing=PaneInfoTiming.EARLY, # 0 + index=0, + nonspeculative_index=-1 + ) + ), + TestWindowedValue( + ('key', [3, 4, 5]), + expected_timestamp, + [GlobalWindow()], + pane_info=PaneInfo( + is_first=False, + is_last=False, + timing=PaneInfoTiming.EARLY, # 0 + index=1, + nonspeculative_index=-1 + ) + ), + TestWindowedValue( + ('key', [6, 7, 8]), + expected_timestamp, + [GlobalWindow()], + pane_info=PaneInfo( + is_first=False, + is_last=False, + timing=PaneInfoTiming.EARLY, # 0 + index=2, + nonspeculative_index=-1 + ) + ), + TestWindowedValue( + ('key', [9, 10, 11]), + expected_timestamp, + [GlobalWindow()], + pane_info=PaneInfo( + is_first=False, + is_last=False, + timing=PaneInfoTiming.EARLY, # 0 + index=3, + nonspeculative_index=-1 + ) + ) + ] if compat_version is None else ( + [ + TestWindowedValue( + ('key', [0, 1, 2]), + expected_timestamp, + [GlobalWindow()], + PANE_INFO_UNKNOWN + ), + TestWindowedValue( + ('key', [3, 4, 5]), + expected_timestamp, + [GlobalWindow()], + PANE_INFO_UNKNOWN + ), + TestWindowedValue( + ('key', [6, 7, 8]), + expected_timestamp, + [GlobalWindow()], + PANE_INFO_UNKNOWN + ), + TestWindowedValue( + ('key', [9, 10, 11]), + expected_timestamp, + [GlobalWindow()], + PANE_INFO_UNKNOWN + ) + ] + ) + + options = PipelineOptions(update_compatibility_version=compat_version) + options.view_as(StandardOptions).streaming = True + + with beam.Pipeline(runner="BundleBasedDirectRunner", options=options) as p: + stream_source = ( + TestStream().advance_watermark_to(0).advance_processing_time( + 100).add_elements(l[:element_count // 4]).advance_processing_time( + 100).advance_watermark_to(100).add_elements( + l[element_count // 4:2 * element_count // 4]). + advance_processing_time(100).advance_watermark_to(200).add_elements( + l[2 * element_count // 4:3 * element_count // + 4]).advance_processing_time( + 100).advance_watermark_to(300).add_elements( + l[3 * element_count // 4:]).advance_processing_time( + 100).advance_watermark_to_infinity()) + grouped = ( + p | stream_source + | "Rewindow" >> beam.WindowInto( + beam.window.GlobalWindows(), + trigger=trigger.Repeatedly(trigger.AfterProcessingTime(1)), + accumulation_mode=trigger.AccumulationMode.DISCARDING) + | beam.GroupByKey()) + + after_reshuffle = (grouped | 'Reshuffle' >> beam.Reshuffle()) + + assert_that( + after_reshuffle, + equal_to(expected), + label='CheckMetadataPreserved', + reify_windows=True) + + @parameterized.expand([ + param(compat_version=None), + param(compat_version="2.63.0"), + ]) + def test_reshuffle_default_window_preserves_metadata(self, compat_version): + """Tests that Reshuffle preserves timestamp, window, and pane info + metadata.""" + + no_firing = PaneInfo( + is_first=True, + is_last=True, + timing=PaneInfoTiming.UNKNOWN, + index=0, + nonspeculative_index=0) + + on_time_only = PaneInfo( + is_first=True, + is_last=True, + timing=PaneInfoTiming.ON_TIME, + index=0, + nonspeculative_index=0) + + late_firing = PaneInfo( + is_first=False, + is_last=False, + timing=PaneInfoTiming.LATE, + index=1, + nonspeculative_index=1) + + expected_preserved = [ + TestWindowedValue('a', MIN_TIMESTAMP, [GlobalWindow()], no_firing), + TestWindowedValue( + 'b', timestamp.Timestamp(0), [GlobalWindow()], on_time_only), + TestWindowedValue( + 'c', timestamp.Timestamp(33), [GlobalWindow()], late_firing), + TestWindowedValue( + 'd', GlobalWindow().max_timestamp(), [GlobalWindow()], no_firing) + ] + + expected_not_preserved = [ + TestWindowedValue( + 'a', MIN_TIMESTAMP, [GlobalWindow()], PANE_INFO_UNKNOWN), + TestWindowedValue( + 'b', timestamp.Timestamp(0), [GlobalWindow()], PANE_INFO_UNKNOWN), + TestWindowedValue( + 'c', timestamp.Timestamp(33), [GlobalWindow()], PANE_INFO_UNKNOWN), + TestWindowedValue( + 'd', + GlobalWindow().max_timestamp(), [GlobalWindow()], + PANE_INFO_UNKNOWN) + ] + + expected = ( + expected_preserved + if compat_version is None else expected_not_preserved) + + options = PipelineOptions(update_compatibility_version=compat_version) + with TestPipeline(options=options) as pipeline: + # Create PaneInfo objects + + # Create windowed values with specific metadata + elements = [ + WindowedValue('a', MIN_TIMESTAMP, [GlobalWindow()], no_firing), + WindowedValue( + 'b', timestamp.Timestamp(0), [GlobalWindow()], on_time_only), + WindowedValue( + 'c', timestamp.Timestamp(33), [GlobalWindow()], late_firing), + WindowedValue( + 'd', GlobalWindow().max_timestamp(), [GlobalWindow()], no_firing) + ] + + after_reshuffle = ( + pipeline + | 'Create' >> beam.Create(elements) + | 'Reshuffle' >> beam.Reshuffle() + # | 'ExtractAfter' >> beam.Map(extract_metadata) + ) + + assert_that( + after_reshuffle, + equal_to(expected), + label='CheckMetadataPreserved', + reify_windows=True) + @pytest.mark.it_validatesrunner def test_reshuffle_preserves_timestamps(self): with TestPipeline() as pipeline: From 4de26741613bfd78151cd33947f58721bc8ad510 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 20 Mar 2025 11:08:14 -0400 Subject: [PATCH 02/12] Fix coders. --- sdks/go/pkg/beam/runners/prism/internal/coders.go | 11 +++++++++++ sdks/go/pkg/beam/runners/prism/internal/urns/urns.go | 1 + 2 files changed, 12 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index ffea90e79065..070b951bba8d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -266,6 +266,17 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i kd(r) vd(r) } + case urns.CoderTuple: + ccids := c.GetComponentCoderIds() + decoders := make([]func(io.Reader), len(ccids)) + for i, ccid := range ccids { + decoders[i] = pullDecoderNoAlloc(coders[ccid], coders) + } + return func(r io.Reader) { + for _, dec := range decoders { + dec(r) + } + } case urns.CoderRow: panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v", prototext.Format(c))) default: diff --git a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go index 5b9b272f5f91..a87eb4877364 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go +++ b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go @@ -124,6 +124,7 @@ var ( CoderTimer = cdrUrn(pipepb.StandardCoders_TIMER) CoderKV = cdrUrn(pipepb.StandardCoders_KV) + CoderTuple = "beam:coder:tuple:v1" CoderLengthPrefix = cdrUrn(pipepb.StandardCoders_LENGTH_PREFIX) CoderNullable = cdrUrn(pipepb.StandardCoders_NULLABLE) CoderIterable = cdrUrn(pipepb.StandardCoders_ITERABLE) From f30477d5ecf12e80d618c6115439289492a34b73 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 21 Mar 2025 18:44:29 -0400 Subject: [PATCH 03/12] Add coder test. --- .../runners/prism/internal/coders_test.go | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go index 4656a94e03ec..13c27867500f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go @@ -47,6 +47,7 @@ func Test_isLeafCoder(t *testing.T) { {urns.CoderIterable, false}, {urns.CoderRow, false}, {urns.CoderKV, false}, + {urns.CoderTuple, false}, } for _, test := range tests { undertest := &pipepb.Coder{ @@ -370,6 +371,32 @@ func Test_pullDecoder(t *testing.T) { }, }, []byte{3, 0}, + }, { + "tuple_multiple_elements", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderTuple, + }, + ComponentCoderIds: []string{"str", "num", "flag"}, + }, + map[string]*pipepb.Coder{ + "str": { + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderStringUTF8, + }, + }, + "num": { + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderVarInt, + }, + }, + "flag": { + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderBool, + }, + }, + }, + []byte{5, 'h', 'e', 'l', 'l', 'o', 42, 1}, }, } for _, test := range tests { From 45748bdd54f46adee2e1c9d8f64fa8401750834a Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 22 Mar 2025 19:23:35 -0400 Subject: [PATCH 04/12] Fix lint error. --- sdks/python/apache_beam/transforms/util.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 6e2ec1dbe464..9de1ff85f96f 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -1002,8 +1002,7 @@ def restore_timestamps(element): tuple[K, tuple[V, Optional[Timestamp], windowed_value.PaneInfo]]) else: - - def reify_timestamps( + def reify_timestamps( # type: ignore[misc] element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam, From 182b6845f1b76077c31d3402755d8061194c8b4c Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 23 Mar 2025 21:44:50 -0400 Subject: [PATCH 05/12] Change compat version. --- sdks/python/apache_beam/transforms/util.py | 2 +- sdks/python/apache_beam/transforms/util_test.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 9de1ff85f96f..617b7a47128e 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -107,7 +107,7 @@ T = TypeVar('T') RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION = "2.64.0" -RESHUFFLE_PANE_INDEX_BREAKING_CHANGE_VERSION = "2.64.0" +RESHUFFLE_PANE_INDEX_BREAKING_CHANGE_VERSION = "2.65.0" class CoGroupByKey(PTransform): diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 11bd297bd39c..5e3ffe118215 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -957,7 +957,7 @@ def test_reshuffle_streaming_global_window_with_buckets(self): @parameterized.expand([ param(compat_version=None), - param(compat_version="2.63.0"), + param(compat_version="2.64.0"), ]) def test_reshuffle_custom_window_preserves_metadata(self, compat_version): """Tests that Reshuffle preserves pane info.""" @@ -1080,7 +1080,7 @@ def test_reshuffle_custom_window_preserves_metadata(self, compat_version): @parameterized.expand([ param(compat_version=None), - param(compat_version="2.63.0"), + param(compat_version="2.64.0"), ]) def test_reshuffle_default_window_preserves_metadata(self, compat_version): """Tests that Reshuffle preserves timestamp, window, and pane info From 0b8fc760a38fb5ce3287fd1066637760983a8041 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 23 Mar 2025 23:28:16 -0400 Subject: [PATCH 06/12] refactor. --- sdks/python/apache_beam/transforms/util.py | 158 +++++++++++------- .../apache_beam/transforms/util_test.py | 4 +- 2 files changed, 97 insertions(+), 65 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 617b7a47128e..281caca8f530 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -950,6 +950,93 @@ def is_compat_version_prior_to(options, breaking_change_version): return False +def _default_window_reify_functions(should_keep_paneinfo): + globally_windowed = window.GlobalWindows.windowed_value(None) + MIN_TIMESTAMP = window.MIN_TIMESTAMP + + if should_keep_paneinfo: + + def reify_metadata( + element, timestamp=DoFn.TimestampParam, pane_info=DoFn.PaneInfoParam): + key, value = element + if timestamp == MIN_TIMESTAMP: + timestamp = None + return key, (value, timestamp, pane_info) + + def restore_metadata(element): + key, values = element + return [ + globally_windowed.with_value((key, value)) + if timestamp is None else window.GlobalWindows.windowed_value( + value=(key, value), timestamp=timestamp, pane_info=pane_info) + for (value, timestamp, pane_info) in values + ] + + return reify_metadata, restore_metadata + + def reify_timestamps(element, timestamp=DoFn.TimestampParam): + key, value = element + if timestamp == MIN_TIMESTAMP: + timestamp = None + return key, (value, timestamp) + + def restore_timestamps(element): + key, values = element + return [ + globally_windowed.with_value((key, value)) + if timestamp is None else window.GlobalWindows.windowed_value( + value=(key, value), timestamp=timestamp) + for (value, timestamp) in values + ] + + return reify_timestamps, restore_timestamps + + +def _custom_window_reify_functions(should_keep_paneinfo): + def restore_timestamps(element): + key, windowed_values = element + return [wv.with_value((key, wv.value)) for wv in windowed_values] + + if should_keep_paneinfo: + + def reify_metadata( + element, + timestamp=DoFn.TimestampParam, + window=DoFn.WindowParam, + pane_info=DoFn.PaneInfoParam): + key, value = element + return key, windowed_value.WindowedValue( + value, timestamp, [window], pane_info) + + return reify_metadata, restore_timestamps + + def reify_timestamps( # type: ignore[misc] + element, + timestamp=DoFn.TimestampParam, + window=DoFn.WindowParam): + key, value = element + return key, windowed_value.WindowedValue(value, timestamp, [window]) + + return reify_timestamps, restore_timestamps + + +def _reify_restore_functions(is_default_windowing, should_keep_paneinfo): + if is_default_windowing: + return _default_window_reify_functions(should_keep_paneinfo) + return _custom_window_reify_functions(should_keep_paneinfo) + + +def _add_pre_map_gkb_types( + pre_gbk_map, is_default_windowing, should_keep_typehint_change): + if not should_keep_typehint_change: + return pre_gbk_map.with_output_types(Any) + if is_default_windowing: + return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( + tuple[K, tuple[V, Optional[Timestamp], windowed_value.PaneInfo]]) + return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( + tuple[K, TypedWindowedValue[V]]) + + @typehints.with_input_types(tuple[K, V]) @typehints.with_output_types(tuple[K, V]) class ReshufflePerKey(PTransform): @@ -960,69 +1047,16 @@ class ReshufflePerKey(PTransform): """ def expand(self, pcoll): windowing_saved = pcoll.windowing + is_default_windowing = windowing_saved.is_default() should_keep_paneinfo = not is_compat_version_prior_to( pcoll.pipeline.options, RESHUFFLE_PANE_INDEX_BREAKING_CHANGE_VERSION) - if windowing_saved.is_default(): - # In this (common) case we can use a trivial trigger driver - # and avoid the (expensive) window param. - globally_windowed = window.GlobalWindows.windowed_value(None) - MIN_TIMESTAMP = window.MIN_TIMESTAMP - - def reify_timestamps( - element, timestamp=DoFn.TimestampParam, pane_info=DoFn.PaneInfoParam): - key, value = element - if timestamp == MIN_TIMESTAMP: - timestamp = None - if should_keep_paneinfo: - return key, (value, timestamp, pane_info) - return key, (value, timestamp) - - def restore_timestamps(element): - key, values = element - if should_keep_paneinfo: - return [ - globally_windowed.with_value((key, value)) - if timestamp is None else window.GlobalWindows.windowed_value( - value=(key, value), timestamp=timestamp, pane_info=pane_info) - for (value, timestamp, pane_info) in values - ] - return [ - globally_windowed.with_value((key, value)) - if timestamp is None else window.GlobalWindows.windowed_value( - value=(key, value), timestamp=timestamp) - for (value, timestamp) in values - ] - - if is_compat_version_prior_to(pcoll.pipeline.options, - RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION): - pre_gbk_map = Map(reify_timestamps).with_output_types(Any) - else: - pre_gbk_map = Map(reify_timestamps).with_input_types( - tuple[K, V]).with_output_types( - tuple[K, tuple[V, Optional[Timestamp], - windowed_value.PaneInfo]]) - else: - def reify_timestamps( # type: ignore[misc] - element, - timestamp=DoFn.TimestampParam, - window=DoFn.WindowParam, - pane_info=DoFn.PaneInfoParam): - key, value = element - if should_keep_paneinfo: - return key, windowed_value.WindowedValue( - value, timestamp, [window], pane_info) - return key, windowed_value.WindowedValue(value, timestamp, [window]) - - def restore_timestamps(element): - key, windowed_values = element - return [wv.with_value((key, wv.value)) for wv in windowed_values] - - if is_compat_version_prior_to(pcoll.pipeline.options, - RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION): - pre_gbk_map = Map(reify_timestamps).with_output_types(Any) - else: - pre_gbk_map = Map(reify_timestamps).with_input_types( - tuple[K, V]).with_output_types(tuple[K, TypedWindowedValue[V]]) + reify_fn, restore_fn = _reify_restore_functions( + is_default_windowing, should_keep_paneinfo) + + should_keep_typehint_change = not is_compat_version_prior_to( + pcoll.pipeline.options, RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION) + pre_gbk_map = _add_pre_map_gkb_types( + Map(reify_fn), is_default_windowing, should_keep_typehint_change) ungrouped = pcoll | pre_gbk_map @@ -1037,7 +1071,7 @@ def restore_timestamps(element): result = ( ungrouped | GroupByKey() - | FlatMap(restore_timestamps).with_output_types(Any)) + | FlatMap(restore_fn).with_output_types(Any)) result._windowing = windowing_saved return result diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 5e3ffe118215..5c32062637f5 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1152,9 +1152,7 @@ def test_reshuffle_default_window_preserves_metadata(self, compat_version): after_reshuffle = ( pipeline | 'Create' >> beam.Create(elements) - | 'Reshuffle' >> beam.Reshuffle() - # | 'ExtractAfter' >> beam.Map(extract_metadata) - ) + | 'Reshuffle' >> beam.Reshuffle()) assert_that( after_reshuffle, From d30bd767459cc124bccd23c5eb4b21835d7ca253 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 24 Mar 2025 00:02:12 -0400 Subject: [PATCH 07/12] Fix typehint issue. --- sdks/python/apache_beam/transforms/util.py | 17 ++++++++++++++--- sdks/python/apache_beam/transforms/util_test.py | 2 -- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 281caca8f530..2fa3a099178e 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -938,6 +938,8 @@ def is_compat_version_prior_to(options, breaking_change_version): # will return False and use the behavior from the breaking change. update_compatibility_version = options.view_as( pipeline_options.StreamingOptions).update_compatibility_version + update_compatibility_version = options.view_as( + pipeline_options.StreamingOptions).update_compatibility_version if update_compatibility_version is None: return False @@ -1027,12 +1029,18 @@ def _reify_restore_functions(is_default_windowing, should_keep_paneinfo): def _add_pre_map_gkb_types( - pre_gbk_map, is_default_windowing, should_keep_typehint_change): + pre_gbk_map, + is_default_windowing, + should_keep_typehint_change, + should_keep_paneinfo): if not should_keep_typehint_change: return pre_gbk_map.with_output_types(Any) if is_default_windowing: + if should_keep_paneinfo: + return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( + tuple[K, tuple[V, Optional[Timestamp], windowed_value.PaneInfo]]) return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( - tuple[K, tuple[V, Optional[Timestamp], windowed_value.PaneInfo]]) + tuple[K, tuple[V, Optional[Timestamp]]]) return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( tuple[K, TypedWindowedValue[V]]) @@ -1056,7 +1064,10 @@ def expand(self, pcoll): should_keep_typehint_change = not is_compat_version_prior_to( pcoll.pipeline.options, RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION) pre_gbk_map = _add_pre_map_gkb_types( - Map(reify_fn), is_default_windowing, should_keep_typehint_change) + Map(reify_fn), + is_default_windowing, + should_keep_typehint_change, + should_keep_paneinfo) ungrouped = pcoll | pre_gbk_map diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 5c32062637f5..3ba1298672d5 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1136,8 +1136,6 @@ def test_reshuffle_default_window_preserves_metadata(self, compat_version): options = PipelineOptions(update_compatibility_version=compat_version) with TestPipeline(options=options) as pipeline: - # Create PaneInfo objects - # Create windowed values with specific metadata elements = [ WindowedValue('a', MIN_TIMESTAMP, [GlobalWindow()], no_firing), From a8ce9186ba4357935cf758a2c7bbca3f662b9e34 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 24 Mar 2025 00:03:48 -0400 Subject: [PATCH 08/12] Use fn api runner. --- sdks/python/apache_beam/transforms/util_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 3ba1298672d5..c8304255238c 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1050,7 +1050,7 @@ def test_reshuffle_custom_window_preserves_metadata(self, compat_version): options = PipelineOptions(update_compatibility_version=compat_version) options.view_as(StandardOptions).streaming = True - with beam.Pipeline(runner="BundleBasedDirectRunner", options=options) as p: + with beam.Pipeline(options=options) as p: stream_source = ( TestStream().advance_watermark_to(0).advance_processing_time( 100).add_elements(l[:element_count // 4]).advance_processing_time( From db0aa824461610463df455fee69ca52b0b8ba3f4 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 24 Mar 2025 11:37:25 -0400 Subject: [PATCH 09/12] Fix lint error. --- sdks/python/apache_beam/transforms/util.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 2fa3a099178e..7e75e7a7ead7 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -1012,10 +1012,8 @@ def reify_metadata( return reify_metadata, restore_timestamps - def reify_timestamps( # type: ignore[misc] - element, - timestamp=DoFn.TimestampParam, - window=DoFn.WindowParam): + def reify_timestamps( + element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam): key, value = element return key, windowed_value.WindowedValue(value, timestamp, [window]) From ee64c2c7152dcda7562f1804e42cb50da564dc80 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 26 Mar 2025 19:40:19 -0400 Subject: [PATCH 10/12] Refactor. --- .../runners/portability/prism_runner_test.py | 5 - sdks/python/apache_beam/transforms/util.py | 189 ++++++++++-------- 2 files changed, 102 insertions(+), 92 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 1e8880a60a35..00ed5126503b 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -203,11 +203,6 @@ def test_custom_window_type(self): "Requires Prism to support Custom Window Coders." + " https://github.com/apache/beam/issues/31921") - def test_pack_combiners(self): - raise unittest.SkipTest( - "Requires Prism to support coder:" + - " 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636") - def test_metrics(self): super().test_metrics(check_bounded_trie=False) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 7e75e7a7ead7..eb7844dd262c 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -107,7 +107,6 @@ T = TypeVar('T') RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION = "2.64.0" -RESHUFFLE_PANE_INDEX_BREAKING_CHANGE_VERSION = "2.65.0" class CoGroupByKey(PTransform): @@ -952,93 +951,49 @@ def is_compat_version_prior_to(options, breaking_change_version): return False -def _default_window_reify_functions(should_keep_paneinfo): - globally_windowed = window.GlobalWindows.windowed_value(None) - MIN_TIMESTAMP = window.MIN_TIMESTAMP +def reify_metadata_default_window( + element, timestamp=DoFn.TimestampParam, pane_info=DoFn.PaneInfoParam): + key, value = element + if timestamp == window.MIN_TIMESTAMP: + timestamp = None + return key, (value, timestamp, pane_info) - if should_keep_paneinfo: - def reify_metadata( - element, timestamp=DoFn.TimestampParam, pane_info=DoFn.PaneInfoParam): - key, value = element - if timestamp == MIN_TIMESTAMP: - timestamp = None - return key, (value, timestamp, pane_info) - - def restore_metadata(element): - key, values = element - return [ - globally_windowed.with_value((key, value)) - if timestamp is None else window.GlobalWindows.windowed_value( - value=(key, value), timestamp=timestamp, pane_info=pane_info) - for (value, timestamp, pane_info) in values - ] - - return reify_metadata, restore_metadata - - def reify_timestamps(element, timestamp=DoFn.TimestampParam): - key, value = element - if timestamp == MIN_TIMESTAMP: - timestamp = None - return key, (value, timestamp) - - def restore_timestamps(element): - key, values = element - return [ - globally_windowed.with_value((key, value)) - if timestamp is None else window.GlobalWindows.windowed_value( - value=(key, value), timestamp=timestamp) - for (value, timestamp) in values - ] - - return reify_timestamps, restore_timestamps - - -def _custom_window_reify_functions(should_keep_paneinfo): - def restore_timestamps(element): - key, windowed_values = element - return [wv.with_value((key, wv.value)) for wv in windowed_values] +def restore_metadata_default_window(element): + key, values = element + return [ + window.GlobalWindows.windowed_value(None).with_value((key, value)) + if timestamp is None else window.GlobalWindows.windowed_value( + value=(key, value), timestamp=timestamp, pane_info=pane_info) + for (value, timestamp, pane_info) in values + ] - if should_keep_paneinfo: - - def reify_metadata( - element, - timestamp=DoFn.TimestampParam, - window=DoFn.WindowParam, - pane_info=DoFn.PaneInfoParam): - key, value = element - return key, windowed_value.WindowedValue( - value, timestamp, [window], pane_info) - return reify_metadata, restore_timestamps +def reify_metadata_custom_window( + element, + timestamp=DoFn.TimestampParam, + window=DoFn.WindowParam, + pane_info=DoFn.PaneInfoParam): + key, value = element + return key, windowed_value.WindowedValue( + value, timestamp, [window], pane_info) - def reify_timestamps( - element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam): - key, value = element - return key, windowed_value.WindowedValue(value, timestamp, [window]) - return reify_timestamps, restore_timestamps +def restore_metadata_custom_window(element): + key, windowed_values = element + return [wv.with_value((key, wv.value)) for wv in windowed_values] -def _reify_restore_functions(is_default_windowing, should_keep_paneinfo): +def _reify_restore_metadata(is_default_windowing): if is_default_windowing: - return _default_window_reify_functions(should_keep_paneinfo) - return _custom_window_reify_functions(should_keep_paneinfo) + return reify_metadata_default_window, restore_metadata_default_window + return reify_metadata_custom_window, restore_metadata_custom_window -def _add_pre_map_gkb_types( - pre_gbk_map, - is_default_windowing, - should_keep_typehint_change, - should_keep_paneinfo): - if not should_keep_typehint_change: - return pre_gbk_map.with_output_types(Any) +def _add_pre_map_gkb_types(pre_gbk_map, is_default_windowing): if is_default_windowing: - if should_keep_paneinfo: - return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( - tuple[K, tuple[V, Optional[Timestamp], windowed_value.PaneInfo]]) return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( - tuple[K, tuple[V, Optional[Timestamp]]]) + tuple[K, tuple[V, Optional[Timestamp], windowed_value.PaneInfo]]) return pre_gbk_map.with_input_types(tuple[K, V]).with_output_types( tuple[K, TypedWindowedValue[V]]) @@ -1051,21 +1006,81 @@ class ReshufflePerKey(PTransform): in particular checkpointing, and preventing fusion of the surrounding transforms. """ + def expand_2_64_0(self, pcoll): + windowing_saved = pcoll.windowing + if windowing_saved.is_default(): + # In this (common) case we can use a trivial trigger driver + # and avoid the (expensive) window param. + globally_windowed = window.GlobalWindows.windowed_value(None) + MIN_TIMESTAMP = window.MIN_TIMESTAMP + + def reify_timestamps(element, timestamp=DoFn.TimestampParam): + key, value = element + if timestamp == MIN_TIMESTAMP: + timestamp = None + return key, (value, timestamp) + + def restore_timestamps(element): + key, values = element + return [ + globally_windowed.with_value((key, value)) if timestamp is None else + window.GlobalWindows.windowed_value((key, value), timestamp) + for (value, timestamp) in values + ] + + if is_compat_version_prior_to(pcoll.pipeline.options, + RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION): + pre_gbk_map = Map(reify_timestamps).with_output_types(Any) + else: + pre_gbk_map = Map(reify_timestamps).with_input_types( + tuple[K, V]).with_output_types( + tuple[K, tuple[V, Optional[Timestamp]]]) + else: + + # typing: All conditional function variants must have identical signatures + def reify_timestamps( # type: ignore[misc] + element, timestamp=DoFn.TimestampParam, window=DoFn.WindowParam): + key, value = element + # Transport the window as part of the value and restore it later. + return key, windowed_value.WindowedValue(value, timestamp, [window]) + + def restore_timestamps(element): + key, windowed_values = element + return [wv.with_value((key, wv.value)) for wv in windowed_values] + + if is_compat_version_prior_to(pcoll.pipeline.options, + RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION): + pre_gbk_map = Map(reify_timestamps).with_output_types(Any) + else: + pre_gbk_map = Map(reify_timestamps).with_input_types( + tuple[K, V]).with_output_types(tuple[K, TypedWindowedValue[V]]) + + ungrouped = pcoll | pre_gbk_map + + # TODO(https://github.com/apache/beam/issues/19785) Using global window as + # one of the standard window. This is to mitigate the Dataflow Java Runner + # Harness limitation to accept only standard coders. + ungrouped._windowing = Windowing( + window.GlobalWindows(), + triggerfn=Always(), + accumulation_mode=AccumulationMode.DISCARDING, + timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST) + result = ( + ungrouped + | GroupByKey() + | FlatMap(restore_timestamps).with_output_types(Any)) + result._windowing = windowing_saved + return result + def expand(self, pcoll): + if is_compat_version_prior_to(pcoll.pipeline.options, "2.65.0"): + return self.expand_2_64_0(pcoll) + windowing_saved = pcoll.windowing is_default_windowing = windowing_saved.is_default() - should_keep_paneinfo = not is_compat_version_prior_to( - pcoll.pipeline.options, RESHUFFLE_PANE_INDEX_BREAKING_CHANGE_VERSION) - reify_fn, restore_fn = _reify_restore_functions( - is_default_windowing, should_keep_paneinfo) - - should_keep_typehint_change = not is_compat_version_prior_to( - pcoll.pipeline.options, RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION) - pre_gbk_map = _add_pre_map_gkb_types( - Map(reify_fn), - is_default_windowing, - should_keep_typehint_change, - should_keep_paneinfo) + reify_fn, restore_fn = _reify_restore_metadata(is_default_windowing) + + pre_gbk_map = _add_pre_map_gkb_types(Map(reify_fn), is_default_windowing) ungrouped = pcoll | pre_gbk_map From f1bc5091330e9faeeab0066a4df61fa421ba1b90 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 26 Mar 2025 19:44:26 -0400 Subject: [PATCH 11/12] Remove strange duplciation. --- sdks/python/apache_beam/transforms/util.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index eb7844dd262c..7c3a1929ba9d 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -933,10 +933,6 @@ def is_compat_version_prior_to(options, breaking_change_version): # keep the old behavior prior to a breaking change or use the new behavior. # - If update_compatibility_version < breaking_change_version, we will return # True and keep the old behavior. - # - If update_compatibility_version is None or >= breaking_change_version, we - # will return False and use the behavior from the breaking change. - update_compatibility_version = options.view_as( - pipeline_options.StreamingOptions).update_compatibility_version update_compatibility_version = options.view_as( pipeline_options.StreamingOptions).update_compatibility_version From 32f560745137897e57997055c4d840d29d4b812e Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 11 Apr 2025 16:45:56 -0400 Subject: [PATCH 12/12] Revert prism coder changes. --- .../pkg/beam/runners/prism/internal/coders.go | 11 -------- .../runners/prism/internal/coders_test.go | 27 ------------------- .../beam/runners/prism/internal/urns/urns.go | 1 - 3 files changed, 39 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index ec3d8c6c6297..6b88790521ce 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -296,17 +296,6 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i kd(r) vd(r) } - case urns.CoderTuple: - ccids := c.GetComponentCoderIds() - decoders := make([]func(io.Reader), len(ccids)) - for i, ccid := range ccids { - decoders[i] = pullDecoderNoAlloc(coders[ccid], coders) - } - return func(r io.Reader) { - for _, dec := range decoders { - dec(r) - } - } case urns.CoderRow: panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v", prototext.Format(c))) default: diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go index 13c27867500f..4656a94e03ec 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go @@ -47,7 +47,6 @@ func Test_isLeafCoder(t *testing.T) { {urns.CoderIterable, false}, {urns.CoderRow, false}, {urns.CoderKV, false}, - {urns.CoderTuple, false}, } for _, test := range tests { undertest := &pipepb.Coder{ @@ -371,32 +370,6 @@ func Test_pullDecoder(t *testing.T) { }, }, []byte{3, 0}, - }, { - "tuple_multiple_elements", - &pipepb.Coder{ - Spec: &pipepb.FunctionSpec{ - Urn: urns.CoderTuple, - }, - ComponentCoderIds: []string{"str", "num", "flag"}, - }, - map[string]*pipepb.Coder{ - "str": { - Spec: &pipepb.FunctionSpec{ - Urn: urns.CoderStringUTF8, - }, - }, - "num": { - Spec: &pipepb.FunctionSpec{ - Urn: urns.CoderVarInt, - }, - }, - "flag": { - Spec: &pipepb.FunctionSpec{ - Urn: urns.CoderBool, - }, - }, - }, - []byte{5, 'h', 'e', 'l', 'l', 'o', 42, 1}, }, } for _, test := range tests { diff --git a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go index a87eb4877364..5b9b272f5f91 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go +++ b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go @@ -124,7 +124,6 @@ var ( CoderTimer = cdrUrn(pipepb.StandardCoders_TIMER) CoderKV = cdrUrn(pipepb.StandardCoders_KV) - CoderTuple = "beam:coder:tuple:v1" CoderLengthPrefix = cdrUrn(pipepb.StandardCoders_LENGTH_PREFIX) CoderNullable = cdrUrn(pipepb.StandardCoders_NULLABLE) CoderIterable = cdrUrn(pipepb.StandardCoders_ITERABLE)