diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 651a17909f54..e75fda999e14 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -109,13 +109,6 @@ def sickbayTests = [ // ShardedKey not yet implemented. 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', - // java.lang.IllegalStateException: Output with tag Tag must have a schema in order to call getRowReceiver - // Ultimately because getRoeReceiver code path SDK side isn't friendly to LengthPrefix wrapping of row coders. - // https://github.com/apache/beam/issues/32931 - 'org.apache.beam.sdk.transforms.ParDoSchemaTest.testReadAndWrite', - 'org.apache.beam.sdk.transforms.ParDoSchemaTest.testReadAndWriteMultiOutput', - 'org.apache.beam.sdk.transforms.ParDoSchemaTest.testReadAndWriteWithSchemaRegistry', - // Technically these tests "succeed" // the test is just complaining that an AssertionException isn't a RuntimeException // diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index d1bd33fdb935..4c92d37549e0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -198,6 +198,35 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, return cID, nil } +// retrieveCoders recursively ensures that the coder along with all its direct +// and indirect component coders, are present in the `bundle` map. +// If a coder is already in `bundle`, it's skipped. Returns an error if any +// required coder ID is not found. +func retrieveCoders(cID string, bundle, base map[string]*pipepb.Coder) error { + // Look up the canonical location. + c, ok := base[cID] + if !ok { + // We messed up somewhere. + return fmt.Errorf("retrieveCoders: coder %q not present in base map", cID) + } + + if _, ok := bundle[cID]; ok { + return nil + } + // Add the original coder to the coders map. + bundle[cID] = c + + for i, cc := range c.GetComponentCoderIds() { + // now we need to retrieve the component coders as well + err := retrieveCoders(cc, bundle, base) + if err != nil { + return fmt.Errorf("retrieveCoders: couldn't handle component %d %q of %q %v:\n%w", i, cc, cID, prototext.Format(c), err) + } + } + + return nil +} + // reconcileCoders ensures that the bundle coders are primed with initial coders from // the base pipeline components. func reconcileCoders(bundle, base map[string]*pipepb.Coder) { diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index e1e942a06f0c..a877a887ac1a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -617,10 +617,9 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng // Update coders for internal collections, and add those collections to the bundle descriptor. for _, pid := range stg.internalCols { col := clonePColToBundle(pid) - if newCID, err := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders()); err == nil && col.GetCoderId() != newCID { - col.CoderId = newCID - } else if err != nil { - return fmt.Errorf("buildDescriptor: coder couldn't rewrite coder %q for internal pcollection %q: %w", col.GetCoderId(), pid, err) + // Keep the original coder of an internal pcollection without rewriting(LP'ing). + if err := retrieveCoders(col.GetCoderId(), coders, comps.GetCoders()); err != nil { + return fmt.Errorf("buildDescriptor: couldn't retrieve coder %q for internal pcollection %q: %w", col.GetCoderId(), pid, err) } } // Add coders for all windowing strategies.