From 8732fbdfc35ced759a56d6a19307eac1ceb68148 Mon Sep 17 00:00:00 2001 From: Tianyang Hu Date: Mon, 10 Jan 2022 11:21:18 -0800 Subject: [PATCH] Remap expanded outputs after merging. --- sdks/go/pkg/beam/core/runtime/graphx/translate.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 9cfa14801dcf..edeabc2d13c9 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -156,10 +156,12 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error) { // If there are external transforms that need expanding, do it now. if m.needsExpansion { - // Remap outputs of expanded external transforms to be the inputs for all downstream consumers - purgeOutputInput(edges, p) // Merge the expanded components into the existing pipeline mergeExpandedWithPipeline(edges, p) + + // Remap outputs of expanded external transforms to be the inputs for all downstream consumers + // Must happen after merging, so that the inputs in the expanded transforms are also updated. + purgeOutputInput(edges, p) } return p, nil