From a345148abc1eb3f490f2eae5e6bba0d8853ae11c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 8 Apr 2025 16:28:49 -0400 Subject: [PATCH 1/3] Replace coder substitution with pcoll substitution in handling flatten. --- sdks/go/pkg/beam/runners/prism/internal/handlerunner.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 340eb0b7eb5f..e58bb8f180ed 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -32,6 +32,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" ) // This file retains the logic for the pardo handler @@ -107,12 +108,12 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C // they're written out to the runner in the same fashion. // This may stop being necessary once Flatten Unzipping happens in the optimizer. outPCol := comps.GetPcollections()[outColID] - outCoder := comps.GetCoders()[outPCol.GetCoderId()] - coderSubs := map[string]*pipepb.Coder{} + pcollSubs := map[string]*pipepb.PCollection{} for _, p := range t.GetInputs() { inPCol := comps.GetPcollections()[p] if inPCol.CoderId != outPCol.CoderId { - coderSubs[inPCol.CoderId] = outCoder + pcollSubs[p] = proto.Clone(inPCol).(*pipepb.PCollection) + pcollSubs[p].CoderId = outPCol.CoderId } } @@ -123,7 +124,7 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C Transforms: map[string]*pipepb.PTransform{ tid: t, }, - Coders: coderSubs, + Pcollections: pcollSubs, }, RemovedLeaves: nil, ForcedRoots: forcedRoots, From 581bb0ea769ff5d01f6c3e57995159171c80cb91 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 8 Apr 2025 16:30:26 -0400 Subject: [PATCH 2/3] Re-enable some tests for flatten. --- runners/prism/java/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 1d58a568c432..712296d4e190 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -111,10 +111,10 @@ def sickbayTests = [ // Java side dying during execution. // https://github.com/apache/beam/issues/32930 - 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders', + // 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders', // Stream corruption error java side: failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74 // Likely due to prism't coder changes. - 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2', + // 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2', // java.lang.IllegalStateException: Output with tag Tag must have a schema in order to call getRowReceiver // Ultimately because getRoeReceiver code path SDK side isn't friendly to LengthPrefix wrapping of row coders. From 0ce6930911bddc8faf1ed1250efdfe8eee2991f8 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 9 Apr 2025 09:10:08 -0400 Subject: [PATCH 3/3] Revert "Re-enable some tests for flatten." This reverts commit 581bb0ea769ff5d01f6c3e57995159171c80cb91. --- runners/prism/java/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 712296d4e190..1d58a568c432 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -111,10 +111,10 @@ def sickbayTests = [ // Java side dying during execution. // https://github.com/apache/beam/issues/32930 - // 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders', + 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders', // Stream corruption error java side: failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74 // Likely due to prism't coder changes. - // 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2', + 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2', // java.lang.IllegalStateException: Output with tag Tag must have a schema in order to call getRowReceiver // Ultimately because getRoeReceiver code path SDK side isn't friendly to LengthPrefix wrapping of row coders.