From e7297b6b38dc6aa5696351bdbcf19e8077c710c3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 15 Apr 2025 23:22:42 -0400 Subject: [PATCH 1/5] A more general fix on handling flatten by injecting sdk-side flatten. --- .../runners/prism/internal/handlerunner.go | 67 +++++++++---------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index ba950de34697..988dd9ec7ed9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -88,8 +88,7 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep } func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.Components) prepareResult { - if !h.config.SDKFlatten { - t.EnvironmentId = "" // force the flatten to be a runner transform due to configuration. + if !h.config.SDKFlatten && !strings.HasPrefix(tid, "ft_") { forcedRoots := []string{tid} // Have runner side transforms be roots. // Force runner flatten consumers to be roots. @@ -109,52 +108,48 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C // they're written out to the runner in the same fashion. // This may stop being necessary once Flatten Unzipping happens in the optimizer. outPCol := comps.GetPcollections()[outColID] - outCoderID := outPCol.CoderId - outCoder := comps.GetCoders()[outCoderID] - coderSubs := map[string]*pipepb.Coder{} pcollSubs := map[string]*pipepb.PCollection{} + tSubs := map[string]*pipepb.PTransform{} - if !strings.HasPrefix(outCoderID, "cf_") { - // Create a new coder id for the flatten output PCollection and use - // this coder id for all input PCollections - outCoderID = "cf_" + outColID - outCoder = proto.Clone(outCoder).(*pipepb.Coder) - coderSubs[outCoderID] = outCoder - - pcollSubs[outColID] = proto.Clone(outPCol).(*pipepb.PCollection) - pcollSubs[outColID].CoderId = outCoderID - - outPCol = pcollSubs[outColID] - } - - for _, p := range t.GetInputs() { + ts := proto.Clone(t).(*pipepb.PTransform) + ts.EnvironmentId = "" // force the flatten to be a runner transform due to configuration. + for localID, p := range t.GetInputs() { inPCol := comps.GetPcollections()[p] if inPCol.CoderId != outPCol.CoderId { - if strings.HasPrefix(inPCol.CoderId, "cf_") { - // The input pcollection is the output of another flatten: - // e.g. [[a, b] | Flatten], c] | Flatten - // In this case, we just point the input coder id to the new flatten - // output coder, so any upstream input pcollections will use the new - // output coder. - coderSubs[inPCol.CoderId] = outCoder - } else { - // Create a substitute PCollection for this input with the flatten - // output coder id - pcollSubs[p] = proto.Clone(inPCol).(*pipepb.PCollection) - pcollSubs[p].CoderId = outPCol.CoderId - } + // TODO: do the following injection conditionally. + // Now we inject an SDK-side flatten between the upstream transform and + // the flatten. + // Before: upstream -> [upstream out] -> runner flatten + // After: upstream -> [upstream out] -> SDK-side flatten -> [SDK-side flatten out] -> runner flatten + // Create a PCollection sub + fColID := "fc_" + p + "_to_" + outColID + fPCol := proto.Clone(outPCol).(*pipepb.PCollection) + fPCol.CoderId = outPCol.CoderId // same coder as runner flatten + pcollSubs[fColID] = fPCol + + // Create a PTransform sub + ftID := "ft_" + p + "_to_" + outColID + ft := proto.Clone(t).(*pipepb.PTransform) + ft.EnvironmentId = t.EnvironmentId // Set environment to ensure it is a SDK-side transform + ft.Inputs = map[string]string{"0": p} + ft.Outputs = map[string]string{"0": fColID} + tSubs[ftID] = ft + + // Replace the input of runner flatten with the output of SDK-side flatten + ts.Inputs[localID] = fColID + + // Force sdk-side flattens to be roots + forcedRoots = append(forcedRoots, ftID) } } + tSubs[tid] = ts // Return the new components which is the transforms consumer return prepareResult{ // We sub this flatten with itself, to not drop it. SubbedComps: &pipepb.Components{ - Transforms: map[string]*pipepb.PTransform{ - tid: t, - }, + Transforms: tSubs, Pcollections: pcollSubs, - Coders: coderSubs, }, RemovedLeaves: nil, ForcedRoots: forcedRoots, From af47d015d6e062503e0af7a5ae33c6dcb80b8ece Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 15 Apr 2025 23:24:57 -0400 Subject: [PATCH 2/5] Re-enable a previously failed flatten test in java. --- runners/prism/java/build.gradle | 5 ----- 1 file changed, 5 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 93b3bef3ce58..651a17909f54 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -109,11 +109,6 @@ def sickbayTests = [ // ShardedKey not yet implemented. 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', - // Java side dying during execution. - // Stream corruption error java side: failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74 - // Likely due to prism't coder changes. - 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2', - // java.lang.IllegalStateException: Output with tag Tag must have a schema in order to call getRowReceiver // Ultimately because getRoeReceiver code path SDK side isn't friendly to LengthPrefix wrapping of row coders. // https://github.com/apache/beam/issues/32931 From df362c7bc21dc5454942fff8a0af631892702c4f Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 16 Apr 2025 11:16:48 -0400 Subject: [PATCH 3/5] Add a new test to cover another case that would crash prism prior to the fix. The test is also included in the test suite of flink, samza and spark, but without transcoding until their corresponding FRs are resolved. --- .../runners/portability/flink_runner_test.py | 7 ++++++- .../portability/fn_api_runner/fn_runner_test.py | 16 ++++++++++++++++ .../runners/portability/samza_runner_test.py | 5 +++++ .../runners/portability/spark_runner_test.py | 7 ++++++- 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 30f1a4c06025..d4e4f6dd82f5 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -299,9 +299,14 @@ def test_sql(self): def test_flattened_side_input(self): # Blocked on support for transcoding - # https://jira.apache.org/jira/browse/BEAM-6523 + # https://github.com/apache/beam/issues/19365 super().test_flattened_side_input(with_transcoding=False) + def test_flatten_and_gbk(self): + # Blocked on support for transcoding + # https://github.com/apache/beam/issues/19365 + super().test_flatten_and_gbk(with_transcoding=False) + def test_metrics(self): super().test_metrics(check_gauge=False, check_bounded_trie=False) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 91b53a16bff4..97fccdcda74f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -538,6 +538,22 @@ def test_flattened_side_input(self, with_transcoding=True): equal_to([('a', 1), ('b', 2)] + third_element), label='CheckFlattenOfSideInput') + def test_flatten_and_gbk(self, with_transcoding=True): + with self.create_pipeline() as p: + side1 = p | 'side1' >> beam.Create([('a', 1)]) + if with_transcoding: + # Also test non-matching coder types (transcoding required) + second_element = [('another_type')] + else: + second_element = [('b', 2)] + side2 = p | 'side2' >> beam.Create(second_element) + + flatten_out = (side1, side2) | beam.Flatten() + gbk_out = side1 | beam.GroupByKey() + + assert_that(flatten_out, equal_to([('a', 1)] + second_element)) + assert_that(gbk_out, equal_to([('a', [1])])) + def test_gbk_side_input(self): with self.create_pipeline() as p: main = p | 'main' >> beam.Create([None]) diff --git a/sdks/python/apache_beam/runners/portability/samza_runner_test.py b/sdks/python/apache_beam/runners/portability/samza_runner_test.py index ceb5240924b5..e01fa461cc9f 100644 --- a/sdks/python/apache_beam/runners/portability/samza_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/samza_runner_test.py @@ -142,6 +142,11 @@ def test_flattened_side_input(self): # https://github.com/apache/beam/issues/20984 super().test_flattened_side_input(with_transcoding=False) + def test_flatten_and_gbk(self): + # Blocked on support for transcoding + # https://github.com/apache/beam/issues/20984 + super().test_flatten_and_gbk(with_transcoding=False) + def test_pack_combiners(self): # Stages produced by translations.pack_combiners are fused # by translations.greedily_fuse, which prevent the stages diff --git a/sdks/python/apache_beam/runners/portability/spark_runner_test.py b/sdks/python/apache_beam/runners/portability/spark_runner_test.py index d4879190738d..fd251021ba49 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner_test.py @@ -174,9 +174,14 @@ def test_pardo_dynamic_timer(self): def test_flattened_side_input(self): # Blocked on support for transcoding - # https://jira.apache.org/jira/browse/BEAM-7236 + # https://github.com/apache/beam/issues/19504 super().test_flattened_side_input(with_transcoding=False) + def test_flatten_and_gbk(self): + # Blocked on support for transcoding + # https://github.com/apache/beam/issues/19504 + super().test_flatten_and_gbk(with_transcoding=False) + def test_custom_merging_window(self): raise unittest.SkipTest("https://github.com/apache/beam/issues/20641") From 369859becce7e08decf55842953ad72db2b505ca Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 16 Apr 2025 13:06:09 -0400 Subject: [PATCH 4/5] Fix a flaky test by sorting the keys of internal producers. --- sdks/go/pkg/beam/runners/prism/internal/preprocess.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index 5c64b08f1dc4..4bf7ba4dff4a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -492,6 +492,9 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa } stg.internalCols = internal + // Sort the keys of internal producers (from stageFacts.PcolProducers) + // to ensure deterministic order for stable tests. + sort.Strings(stg.internalCols) stg.outputs = maps.Values(outputs) stg.sideInputs = sideInputs From 52827b7ada20ba674c38dee04ad09284380d859c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 16 Apr 2025 14:56:11 -0400 Subject: [PATCH 5/5] Skip the new flatten-gbk test in flink runner. --- .../apache_beam/runners/portability/flink_runner_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index d4e4f6dd82f5..a2b5b0013d7b 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -305,7 +305,9 @@ def test_flattened_side_input(self): def test_flatten_and_gbk(self): # Blocked on support for transcoding # https://github.com/apache/beam/issues/19365 - super().test_flatten_and_gbk(with_transcoding=False) + # Also blocked on support of flatten and groupby sharing the same input + # https://github.com/apache/beam/issues/34647 + raise unittest.SkipTest("https://github.com/apache/beam/issues/34647") def test_metrics(self): super().test_metrics(check_gauge=False, check_bounded_trie=False)