From 2e99b828aca5b5d873be824d30f94bc57e187559 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 10 Apr 2025 15:23:56 -0400 Subject: [PATCH 1/3] Fix coder for python tuple and potentially any non-standard composite types --- .../pkg/beam/runners/prism/internal/coders.go | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index ffea90e79065..48979c9533ae 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -35,6 +35,11 @@ import ( // leafCoders lists coder urns the runner knows how to manipulate. // In particular, ones that won't be a problem to parse, in general // because they have a known total size. +// +// Important: The 'leafCoders' and 'knownCompositeCoders' lists are not mutually +// exclusive, nor do they necessarily cover all possible standard coder types. +// For example, CoderLengthPrefix is a leaf coder even though it is also a +// known composite coder, and coderRow is neither a leaf coder nor a composite coder. var leafCoders = map[string]struct{}{ urns.CoderBytes: {}, urns.CoderStringUTF8: {}, @@ -51,6 +56,26 @@ func isLeafCoder(c *pipepb.Coder) bool { return ok } +// knownCompositeCoders lists coder urns that we expect to see components in +// their spec. +var knownCompositeCoders = map[string]struct{}{ + urns.CoderKV: {}, + urns.CoderIterable: {}, + urns.CoderTimer: {}, + urns.CoderLengthPrefix: {}, + urns.CoderWindowedValue: {}, + urns.CoderParamWindowedValue: {}, + urns.CoderStateBackedIterable: {}, + urns.CoderCustomWindow: {}, + urns.CoderShardedKey: {}, + urns.CoderNullable: {}, +} + +func isKnownCompositeCoder(c *pipepb.Coder) bool { + _, ok := knownCompositeCoders[c.GetSpec().GetUrn()] + return ok +} + // makeWindowedValueCoder gets the coder for the PCollection, renders it safe, and adds it to the coders map. // // PCollection coders are not inherently WindowValueCoder wrapped, and they are added by the runner @@ -123,10 +148,10 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, } // Add the original coder to the coders map. bundle[cID] = c - // If we don't know this coder, and it has no sub components, - // we must LP it, and we return the LP'd version. + // If we don't know this coder, we must LP it, and we return the LP'd version. leaf := isLeafCoder(c) - if len(c.GetComponentCoderIds()) == 0 && !leaf { + knownComposite := isKnownCompositeCoder(c) + if !leaf && !knownComposite { lpc := &pipepb.Coder{ Spec: &pipepb.FunctionSpec{ Urn: urns.CoderLengthPrefix, @@ -136,8 +161,7 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, bundle[lpcID] = lpc return lpcID, nil } - // We know we have a composite, so if we count this as a leaf, move everything to - // the coders map. + // If it is a leaf, move its components (if any) to the coders map. if leaf { // Copy the components from the base. for _, cc := range c.GetComponentCoderIds() { @@ -145,6 +169,10 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, } return cID, nil } + + // Now we have a known composite. + // We may need to LP its components. If so, we make a new composite with the + // LP'd components. var needNewComposite bool var comps []string for i, cc := range c.GetComponentCoderIds() { From 00cc494501d290f24faecf86c7b06c3a41f1c6f7 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 10 Apr 2025 16:20:42 -0400 Subject: [PATCH 2/3] Re-enable test_pack_combiners --- .../apache_beam/runners/portability/prism_runner_test.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 57bd78be3068..0a08ec7d358d 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -217,11 +217,6 @@ def test_custom_window_type(self): "Requires Prism to support Custom Window Coders." + " https://github.com/apache/beam/issues/31921") - def test_pack_combiners(self): - raise unittest.SkipTest( - "Requires Prism to support coder:" + - " 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636") - def test_metrics(self): super().test_metrics(check_bounded_trie=False) From 28a442d30d93ec7bca900caa9d72c5d663d157bd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 10 Apr 2025 16:35:54 -0400 Subject: [PATCH 3/3] Exclude CoderLengthPrefix from knownCompositeCoders so leaf and knownComposite are now mutually exclusive. --- sdks/go/pkg/beam/runners/prism/internal/coders.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 48979c9533ae..6b88790521ce 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -36,10 +36,10 @@ import ( // In particular, ones that won't be a problem to parse, in general // because they have a known total size. // -// Important: The 'leafCoders' and 'knownCompositeCoders' lists are not mutually -// exclusive, nor do they necessarily cover all possible standard coder types. -// For example, CoderLengthPrefix is a leaf coder even though it is also a -// known composite coder, and coderRow is neither a leaf coder nor a composite coder. +// Important: The 'leafCoders' and 'knownCompositeCoders' do not necessarily +// cover all possible standard coder types. +// For example, coderRow is neither a leaf coder nor a composite coder (so it +// will have to be LP'd). var leafCoders = map[string]struct{}{ urns.CoderBytes: {}, urns.CoderStringUTF8: {}, @@ -62,13 +62,15 @@ var knownCompositeCoders = map[string]struct{}{ urns.CoderKV: {}, urns.CoderIterable: {}, urns.CoderTimer: {}, - urns.CoderLengthPrefix: {}, urns.CoderWindowedValue: {}, urns.CoderParamWindowedValue: {}, urns.CoderStateBackedIterable: {}, urns.CoderCustomWindow: {}, urns.CoderShardedKey: {}, urns.CoderNullable: {}, + // Exclude CoderLengthPrefix from the list. Even though it is a composite coder, + // we never need to introspect its component. + // urns.CoderLengthPrefix: {}, } func isKnownCompositeCoder(c *pipepb.Coder) bool {