diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 340eb0b7eb5f..e58bb8f180ed 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -32,6 +32,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" ) // This file retains the logic for the pardo handler @@ -107,12 +108,12 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C // they're written out to the runner in the same fashion. // This may stop being necessary once Flatten Unzipping happens in the optimizer. outPCol := comps.GetPcollections()[outColID] - outCoder := comps.GetCoders()[outPCol.GetCoderId()] - coderSubs := map[string]*pipepb.Coder{} + pcollSubs := map[string]*pipepb.PCollection{} for _, p := range t.GetInputs() { inPCol := comps.GetPcollections()[p] if inPCol.CoderId != outPCol.CoderId { - coderSubs[inPCol.CoderId] = outCoder + pcollSubs[p] = proto.Clone(inPCol).(*pipepb.PCollection) + pcollSubs[p].CoderId = outPCol.CoderId } } @@ -123,7 +124,7 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C Transforms: map[string]*pipepb.PTransform{ tid: t, }, - Coders: coderSubs, + Pcollections: pcollSubs, }, RemovedLeaves: nil, ForcedRoots: forcedRoots,