diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 9cfa14801dcf..edeabc2d13c9 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -156,10 +156,12 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error) { // If there are external transforms that need expanding, do it now. if m.needsExpansion { - // Remap outputs of expanded external transforms to be the inputs for all downstream consumers - purgeOutputInput(edges, p) // Merge the expanded components into the existing pipeline mergeExpandedWithPipeline(edges, p) + + // Remap outputs of expanded external transforms to be the inputs for all downstream consumers + // Must happen after merging, so that the inputs in the expanded transforms are also updated. + purgeOutputInput(edges, p) } return p, nil