diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index ffea90e79065..6b88790521ce 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -35,6 +35,11 @@ import ( // leafCoders lists coder urns the runner knows how to manipulate. // In particular, ones that won't be a problem to parse, in general // because they have a known total size. +// +// Important: The 'leafCoders' and 'knownCompositeCoders' do not necessarily +// cover all possible standard coder types. +// For example, coderRow is neither a leaf coder nor a composite coder (so it +// will have to be LP'd). var leafCoders = map[string]struct{}{ urns.CoderBytes: {}, urns.CoderStringUTF8: {}, @@ -51,6 +56,28 @@ func isLeafCoder(c *pipepb.Coder) bool { return ok } +// knownCompositeCoders lists coder urns that we expect to see components in +// their spec. +var knownCompositeCoders = map[string]struct{}{ + urns.CoderKV: {}, + urns.CoderIterable: {}, + urns.CoderTimer: {}, + urns.CoderWindowedValue: {}, + urns.CoderParamWindowedValue: {}, + urns.CoderStateBackedIterable: {}, + urns.CoderCustomWindow: {}, + urns.CoderShardedKey: {}, + urns.CoderNullable: {}, + // Exclude CoderLengthPrefix from the list. Even though it is a composite coder, + // we never need to introspect its component. + // urns.CoderLengthPrefix: {}, +} + +func isKnownCompositeCoder(c *pipepb.Coder) bool { + _, ok := knownCompositeCoders[c.GetSpec().GetUrn()] + return ok +} + // makeWindowedValueCoder gets the coder for the PCollection, renders it safe, and adds it to the coders map. // // PCollection coders are not inherently WindowValueCoder wrapped, and they are added by the runner @@ -123,10 +150,10 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, } // Add the original coder to the coders map. bundle[cID] = c - // If we don't know this coder, and it has no sub components, - // we must LP it, and we return the LP'd version. + // If we don't know this coder, we must LP it, and we return the LP'd version. leaf := isLeafCoder(c) - if len(c.GetComponentCoderIds()) == 0 && !leaf { + knownComposite := isKnownCompositeCoder(c) + if !leaf && !knownComposite { lpc := &pipepb.Coder{ Spec: &pipepb.FunctionSpec{ Urn: urns.CoderLengthPrefix, @@ -136,8 +163,7 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, bundle[lpcID] = lpc return lpcID, nil } - // We know we have a composite, so if we count this as a leaf, move everything to - // the coders map. + // If it is a leaf, move its components (if any) to the coders map. if leaf { // Copy the components from the base. for _, cc := range c.GetComponentCoderIds() { @@ -145,6 +171,10 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, } return cID, nil } + + // Now we have a known composite. + // We may need to LP its components. If so, we make a new composite with the + // LP'd components. var needNewComposite bool var comps []string for i, cc := range c.GetComponentCoderIds() { diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 57bd78be3068..0a08ec7d358d 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -217,11 +217,6 @@ def test_custom_window_type(self): "Requires Prism to support Custom Window Coders." + " https://github.com/apache/beam/issues/31921") - def test_pack_combiners(self): - raise unittest.SkipTest( - "Requires Prism to support coder:" + - " 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636") - def test_metrics(self): super().test_metrics(check_bounded_trie=False)