diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json index 0b34d452d42c..ab05f7bdc634 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json @@ -1,3 +1,4 @@ { - "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" + "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support", + "https://github.com/apache/beam/pull/34830": "testing" } diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Samza.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Samza.json new file mode 100644 index 000000000000..cdaa11075bdf --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Samza.json @@ -0,0 +1,3 @@ +{ + "https://github.com/apache/beam/pull/34830": "testing" +} diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json new file mode 100644 index 000000000000..cdaa11075bdf --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json @@ -0,0 +1,3 @@ +{ + "https://github.com/apache/beam/pull/34830": "testing" +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 4c92d37549e0..9b8e0fe731bb 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -331,6 +331,17 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i kd(r) vd(r) } + case urns.CoderWindowedValue: + ccids := c.GetComponentCoderIds() + if len(ccids) != 2 { + panic(fmt.Sprintf("WindowedValue coder with more than 2 components: %s", prototext.Format(c))) + } + ed := pullDecoderNoAlloc(coders[ccids[0]], coders) + wd := pullDecoderNoAlloc(coders[ccids[1]], coders) + return func(r io.Reader) { + ed(r) + wd(r) + } case urns.CoderRow: panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v", prototext.Format(c))) default: diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 97fccdcda74f..9cf26743b130 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -1071,6 +1071,15 @@ def test_reshuffle(self): assert_that( p | beam.Create([1, 2, 3]) | beam.Reshuffle(), equal_to([1, 2, 3])) + def test_reshuffle_after_custom_window(self): + with self.create_pipeline() as p: + assert_that( + p | beam.Create([12, 2, 1]) + | beam.Map(lambda t: window.TimestampedValue(t, t)) + | beam.WindowInto(beam.transforms.window.FixedWindows(2)) + | beam.Reshuffle(), + equal_to([12, 2, 1])) + def test_flatten(self, with_transcoding=True): with self.create_pipeline() as p: if with_transcoding: diff --git a/sdks/python/apache_beam/runners/portability/samza_runner_test.py b/sdks/python/apache_beam/runners/portability/samza_runner_test.py index 8dcd92ce503b..e46885c7c96b 100644 --- a/sdks/python/apache_beam/runners/portability/samza_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/samza_runner_test.py @@ -186,6 +186,9 @@ def test_custom_merging_window(self): def test_custom_window_type(self): raise unittest.SkipTest("https://github.com/apache/beam/issues/21049") + def test_reshuffle_after_custom_window(self): + raise unittest.SkipTest("https://github.com/apache/beam/issues/34831") + if __name__ == '__main__': # Run the tests.