diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 93b3bef3ce58..651a17909f54 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -109,11 +109,6 @@ def sickbayTests = [ // ShardedKey not yet implemented. 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', - // Java side dying during execution. - // Stream corruption error java side: failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74 - // Likely due to prism't coder changes. - 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2', - // java.lang.IllegalStateException: Output with tag Tag must have a schema in order to call getRowReceiver // Ultimately because getRoeReceiver code path SDK side isn't friendly to LengthPrefix wrapping of row coders. // https://github.com/apache/beam/issues/32931 diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index ba950de34697..988dd9ec7ed9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -88,8 +88,7 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep } func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.Components) prepareResult { - if !h.config.SDKFlatten { - t.EnvironmentId = "" // force the flatten to be a runner transform due to configuration. + if !h.config.SDKFlatten && !strings.HasPrefix(tid, "ft_") { forcedRoots := []string{tid} // Have runner side transforms be roots. // Force runner flatten consumers to be roots. @@ -109,52 +108,48 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C // they're written out to the runner in the same fashion. // This may stop being necessary once Flatten Unzipping happens in the optimizer. outPCol := comps.GetPcollections()[outColID] - outCoderID := outPCol.CoderId - outCoder := comps.GetCoders()[outCoderID] - coderSubs := map[string]*pipepb.Coder{} pcollSubs := map[string]*pipepb.PCollection{} + tSubs := map[string]*pipepb.PTransform{} - if !strings.HasPrefix(outCoderID, "cf_") { - // Create a new coder id for the flatten output PCollection and use - // this coder id for all input PCollections - outCoderID = "cf_" + outColID - outCoder = proto.Clone(outCoder).(*pipepb.Coder) - coderSubs[outCoderID] = outCoder - - pcollSubs[outColID] = proto.Clone(outPCol).(*pipepb.PCollection) - pcollSubs[outColID].CoderId = outCoderID - - outPCol = pcollSubs[outColID] - } - - for _, p := range t.GetInputs() { + ts := proto.Clone(t).(*pipepb.PTransform) + ts.EnvironmentId = "" // force the flatten to be a runner transform due to configuration. + for localID, p := range t.GetInputs() { inPCol := comps.GetPcollections()[p] if inPCol.CoderId != outPCol.CoderId { - if strings.HasPrefix(inPCol.CoderId, "cf_") { - // The input pcollection is the output of another flatten: - // e.g. [[a, b] | Flatten], c] | Flatten - // In this case, we just point the input coder id to the new flatten - // output coder, so any upstream input pcollections will use the new - // output coder. - coderSubs[inPCol.CoderId] = outCoder - } else { - // Create a substitute PCollection for this input with the flatten - // output coder id - pcollSubs[p] = proto.Clone(inPCol).(*pipepb.PCollection) - pcollSubs[p].CoderId = outPCol.CoderId - } + // TODO: do the following injection conditionally. + // Now we inject an SDK-side flatten between the upstream transform and + // the flatten. + // Before: upstream -> [upstream out] -> runner flatten + // After: upstream -> [upstream out] -> SDK-side flatten -> [SDK-side flatten out] -> runner flatten + // Create a PCollection sub + fColID := "fc_" + p + "_to_" + outColID + fPCol := proto.Clone(outPCol).(*pipepb.PCollection) + fPCol.CoderId = outPCol.CoderId // same coder as runner flatten + pcollSubs[fColID] = fPCol + + // Create a PTransform sub + ftID := "ft_" + p + "_to_" + outColID + ft := proto.Clone(t).(*pipepb.PTransform) + ft.EnvironmentId = t.EnvironmentId // Set environment to ensure it is a SDK-side transform + ft.Inputs = map[string]string{"0": p} + ft.Outputs = map[string]string{"0": fColID} + tSubs[ftID] = ft + + // Replace the input of runner flatten with the output of SDK-side flatten + ts.Inputs[localID] = fColID + + // Force sdk-side flattens to be roots + forcedRoots = append(forcedRoots, ftID) } } + tSubs[tid] = ts // Return the new components which is the transforms consumer return prepareResult{ // We sub this flatten with itself, to not drop it. SubbedComps: &pipepb.Components{ - Transforms: map[string]*pipepb.PTransform{ - tid: t, - }, + Transforms: tSubs, Pcollections: pcollSubs, - Coders: coderSubs, }, RemovedLeaves: nil, ForcedRoots: forcedRoots, diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index 5c64b08f1dc4..4bf7ba4dff4a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -492,6 +492,9 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa } stg.internalCols = internal + // Sort the keys of internal producers (from stageFacts.PcolProducers) + // to ensure deterministic order for stable tests. + sort.Strings(stg.internalCols) stg.outputs = maps.Values(outputs) stg.sideInputs = sideInputs diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 30f1a4c06025..a2b5b0013d7b 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -299,9 +299,16 @@ def test_sql(self): def test_flattened_side_input(self): # Blocked on support for transcoding - # https://jira.apache.org/jira/browse/BEAM-6523 + # https://github.com/apache/beam/issues/19365 super().test_flattened_side_input(with_transcoding=False) + def test_flatten_and_gbk(self): + # Blocked on support for transcoding + # https://github.com/apache/beam/issues/19365 + # Also blocked on support of flatten and groupby sharing the same input + # https://github.com/apache/beam/issues/34647 + raise unittest.SkipTest("https://github.com/apache/beam/issues/34647") + def test_metrics(self): super().test_metrics(check_gauge=False, check_bounded_trie=False) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 91b53a16bff4..97fccdcda74f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -538,6 +538,22 @@ def test_flattened_side_input(self, with_transcoding=True): equal_to([('a', 1), ('b', 2)] + third_element), label='CheckFlattenOfSideInput') + def test_flatten_and_gbk(self, with_transcoding=True): + with self.create_pipeline() as p: + side1 = p | 'side1' >> beam.Create([('a', 1)]) + if with_transcoding: + # Also test non-matching coder types (transcoding required) + second_element = [('another_type')] + else: + second_element = [('b', 2)] + side2 = p | 'side2' >> beam.Create(second_element) + + flatten_out = (side1, side2) | beam.Flatten() + gbk_out = side1 | beam.GroupByKey() + + assert_that(flatten_out, equal_to([('a', 1)] + second_element)) + assert_that(gbk_out, equal_to([('a', [1])])) + def test_gbk_side_input(self): with self.create_pipeline() as p: main = p | 'main' >> beam.Create([None]) diff --git a/sdks/python/apache_beam/runners/portability/samza_runner_test.py b/sdks/python/apache_beam/runners/portability/samza_runner_test.py index ceb5240924b5..e01fa461cc9f 100644 --- a/sdks/python/apache_beam/runners/portability/samza_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/samza_runner_test.py @@ -142,6 +142,11 @@ def test_flattened_side_input(self): # https://github.com/apache/beam/issues/20984 super().test_flattened_side_input(with_transcoding=False) + def test_flatten_and_gbk(self): + # Blocked on support for transcoding + # https://github.com/apache/beam/issues/20984 + super().test_flatten_and_gbk(with_transcoding=False) + def test_pack_combiners(self): # Stages produced by translations.pack_combiners are fused # by translations.greedily_fuse, which prevent the stages diff --git a/sdks/python/apache_beam/runners/portability/spark_runner_test.py b/sdks/python/apache_beam/runners/portability/spark_runner_test.py index d4879190738d..fd251021ba49 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner_test.py @@ -174,9 +174,14 @@ def test_pardo_dynamic_timer(self): def test_flattened_side_input(self): # Blocked on support for transcoding - # https://jira.apache.org/jira/browse/BEAM-7236 + # https://github.com/apache/beam/issues/19504 super().test_flattened_side_input(with_transcoding=False) + def test_flatten_and_gbk(self): + # Blocked on support for transcoding + # https://github.com/apache/beam/issues/19504 + super().test_flatten_and_gbk(with_transcoding=False) + def test_custom_merging_window(self): raise unittest.SkipTest("https://github.com/apache/beam/issues/20641")