diff --git a/sdks/go/pkg/beam/core/runtime/exec/hash.go b/sdks/go/pkg/beam/core/runtime/exec/hash.go index 353d203d4e48..7b540bcf1594 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/hash.go +++ b/sdks/go/pkg/beam/core/runtime/exec/hash.go @@ -37,6 +37,13 @@ type elementHasher interface { func makeElementHasher(c *coder.Coder, wc *coder.WindowCoder) elementHasher { hasher := &maphash.Hash{} we := MakeWindowEncoder(wc) + + // Unwrap length prefix coders. + // A length prefix changes the hash itself, but shouldn't affect + // that identical elements have the same hash, so skip them here. + if c.Kind == coder.LP { + c = c.Components[0] + } switch c.Kind { case coder.Bytes: return &bytesHasher{hash: hasher, we: we} diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 9d1c8481d65e..da374c96cafe 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -326,6 +326,13 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng coders := map[string]*pipepb.Coder{} transforms := map[string]*pipepb.PTransform{} + pcollections := map[string]*pipepb.PCollection{} + + clonePColToBundle := func(pid string) *pipepb.PCollection { + col := proto.Clone(comps.GetPcollections()[pid]).(*pipepb.PCollection) + pcollections[pid] = col + return col + } for _, tid := range stg.transforms { t := comps.GetTransforms()[tid] @@ -408,7 +415,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng sink2Col := map[string]string{} col2Coders := map[string]engine.PColInfo{} for _, o := range stg.outputs { - col := comps.GetPcollections()[o.Global] + col := clonePColToBundle(o.Global) wOutCid, err := makeWindowedValueCoder(o.Global, comps, coders) if err != nil { return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for output %+v, pcol %q %v:\n%w %v", stg.ID, o, o.Global, prototext.Format(col), err, stg.transforms) @@ -435,7 +442,8 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng var prepareSides []func(b *worker.B, watermark mtime.Time) for _, si := range stg.sideInputs { - col := comps.GetPcollections()[si.Global] + col := clonePColToBundle(si.Global) + oCID := col.GetCoderId() nCID, err := lpUnknownCoders(oCID, coders, comps.GetCoders()) if err != nil { @@ -444,7 +452,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng if oCID != nCID { // Add a synthetic PCollection set with the new coder. newGlobal := si.Global + "_prismside" - comps.GetPcollections()[newGlobal] = &pipepb.PCollection{ + pcollections[newGlobal] = &pipepb.PCollection{ DisplayData: col.GetDisplayData(), UniqueName: col.GetUniqueName(), CoderId: nCID, @@ -467,7 +475,13 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng // coders used by side inputs to the coders map for the bundle, so // needs to be run for every ID. - col := comps.GetPcollections()[stg.primaryInput] + col := clonePColToBundle(stg.primaryInput) + if newCID, err := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders()); err == nil && col.GetCoderId() != newCID { + col.CoderId = newCID + } else if err != nil { + return fmt.Errorf("buildDescriptor: couldn't rewrite coder %q for primary input pcollection %q: %w", col.GetCoderId(), stg.primaryInput, err) + } + wInCid, err := makeWindowedValueCoder(stg.primaryInput, comps, coders) if err != nil { return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for primary input, pcol %q %v:\n%w\n%v", stg.ID, stg.primaryInput, prototext.Format(col), err, stg.transforms) @@ -491,9 +505,14 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng stg.inputTransformID = stg.ID + "_source" transforms[stg.inputTransformID] = sourceTransform(stg.inputTransformID, portFor(wInCid, wk), stg.primaryInput) - // Add coders for internal collections. + // Update coders for internal collections, and add those collections to the bundle descriptor. for _, pid := range stg.internalCols { - lpUnknownCoders(comps.GetPcollections()[pid].GetCoderId(), coders, comps.GetCoders()) + col := clonePColToBundle(pid) + if newCID, err := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders()); err == nil && col.GetCoderId() != newCID { + col.CoderId = newCID + } else if err != nil { + return fmt.Errorf("buildDescriptor: coder couldn't rewrite coder %q for internal pcollection %q: %w", col.GetCoderId(), pid, err) + } } // Add coders for all windowing strategies. // TODO: filter PCollections, filter windowing strategies by Pcollections instead. @@ -514,7 +533,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng Id: stg.ID, Transforms: transforms, WindowingStrategies: comps.GetWindowingStrategies(), - Pcollections: comps.GetPcollections(), + Pcollections: pcollections, Coders: coders, StateApiServiceDescriptor: &pipepb.ApiServiceDescriptor{ Url: wk.Endpoint(),