From 1df938154131d2c53e262d320d3fa2848b19f42e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 3 May 2025 10:56:56 -0400 Subject: [PATCH 1/4] Support windowed value coder in prism. --- sdks/go/pkg/beam/runners/prism/internal/coders.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 4c92d37549e0..9b8e0fe731bb 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -331,6 +331,17 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i kd(r) vd(r) } + case urns.CoderWindowedValue: + ccids := c.GetComponentCoderIds() + if len(ccids) != 2 { + panic(fmt.Sprintf("WindowedValue coder with more than 2 components: %s", prototext.Format(c))) + } + ed := pullDecoderNoAlloc(coders[ccids[0]], coders) + wd := pullDecoderNoAlloc(coders[ccids[1]], coders) + return func(r io.Reader) { + ed(r) + wd(r) + } case urns.CoderRow: panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v", prototext.Format(c))) default: From c059c162e0c137d9850125b740db57bce6ec5a4a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 3 May 2025 17:31:39 -0400 Subject: [PATCH 2/4] Add a new test case that previously failed to run in prism. --- .../runners/portability/fn_api_runner/fn_runner_test.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 97fccdcda74f..86f47c7f268f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -1071,6 +1071,14 @@ def test_reshuffle(self): assert_that( p | beam.Create([1, 2, 3]) | beam.Reshuffle(), equal_to([1, 2, 3])) + def test_reshuffle_after_custom_window(self): + with self.create_pipeline() as p: + assert_that( + p | beam.Create([12, 2, 1]) + | beam.Map(lambda t: window.TimestampedValue(t, t)) + | beam.WindowInto(beam.transforms.window.FixedWindows(2)) + | beam.Reshuffle(), equal_to([12, 2, 1])) + def test_flatten(self, with_transcoding=True): with self.create_pipeline() as p: if with_transcoding: From fd8b8c27a7c2c5c62bca5f989f409c28ab9aad98 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 3 May 2025 17:56:51 -0400 Subject: [PATCH 3/4] Fix lints and trigger some validatesrunner tests. --- .../beam_PostCommit_Python_ValidatesRunner_Flink.json | 3 ++- .../beam_PostCommit_Python_ValidatesRunner_Samza.json | 3 +++ .../beam_PostCommit_Python_ValidatesRunner_Spark.json | 3 +++ .../runners/portability/fn_api_runner/fn_runner_test.py | 3 ++- 4 files changed, 10 insertions(+), 2 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Samza.json create mode 100644 .github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json index 0b34d452d42c..ab05f7bdc634 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json @@ -1,3 +1,4 @@ { - "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" + "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support", + "https://github.com/apache/beam/pull/34830": "testing" } diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Samza.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Samza.json new file mode 100644 index 000000000000..cdaa11075bdf --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Samza.json @@ -0,0 +1,3 @@ +{ + "https://github.com/apache/beam/pull/34830": "testing" +} diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json new file mode 100644 index 000000000000..cdaa11075bdf --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json @@ -0,0 +1,3 @@ +{ + "https://github.com/apache/beam/pull/34830": "testing" +} diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 86f47c7f268f..9cf26743b130 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -1077,7 +1077,8 @@ def test_reshuffle_after_custom_window(self): p | beam.Create([12, 2, 1]) | beam.Map(lambda t: window.TimestampedValue(t, t)) | beam.WindowInto(beam.transforms.window.FixedWindows(2)) - | beam.Reshuffle(), equal_to([12, 2, 1])) + | beam.Reshuffle(), + equal_to([12, 2, 1])) def test_flatten(self, with_transcoding=True): with self.create_pipeline() as p: From d33ac3c351fc69134d4814f6dfb628b8981434b5 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 3 May 2025 19:00:42 -0400 Subject: [PATCH 4/4] Disable the newly added test for samza runner. --- .../apache_beam/runners/portability/samza_runner_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/samza_runner_test.py b/sdks/python/apache_beam/runners/portability/samza_runner_test.py index 8dcd92ce503b..e46885c7c96b 100644 --- a/sdks/python/apache_beam/runners/portability/samza_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/samza_runner_test.py @@ -186,6 +186,9 @@ def test_custom_merging_window(self): def test_custom_window_type(self): raise unittest.SkipTest("https://github.com/apache/beam/issues/21049") + def test_reshuffle_after_custom_window(self): + raise unittest.SkipTest("https://github.com/apache/beam/issues/34831") + if __name__ == '__main__': # Run the tests.