Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import (
// leafCoders lists coder urns the runner knows how to manipulate.
// In particular, ones that won't be a problem to parse, in general
// because they have a known total size.
//
// Important: The 'leafCoders' and 'knownCompositeCoders' do not necessarily
// cover all possible standard coder types.
// For example, coderRow is neither a leaf coder nor a composite coder (so it
// will have to be LP'd).
var leafCoders = map[string]struct{}{
urns.CoderBytes: {},
urns.CoderStringUTF8: {},
Expand All @@ -51,6 +56,28 @@ func isLeafCoder(c *pipepb.Coder) bool {
return ok
}

// knownCompositeCoders lists coder urns that we expect to see components in
// their spec.
var knownCompositeCoders = map[string]struct{}{
urns.CoderKV: {},
urns.CoderIterable: {},
urns.CoderTimer: {},
urns.CoderWindowedValue: {},
urns.CoderParamWindowedValue: {},
urns.CoderStateBackedIterable: {},
urns.CoderCustomWindow: {},
urns.CoderShardedKey: {},
urns.CoderNullable: {},
// Exclude CoderLengthPrefix from the list. Even though it is a composite coder,
// we never need to introspect its component.
// urns.CoderLengthPrefix: {},
}

func isKnownCompositeCoder(c *pipepb.Coder) bool {
_, ok := knownCompositeCoders[c.GetSpec().GetUrn()]
return ok
}

// makeWindowedValueCoder gets the coder for the PCollection, renders it safe, and adds it to the coders map.
//
// PCollection coders are not inherently WindowValueCoder wrapped, and they are added by the runner
Expand Down Expand Up @@ -123,10 +150,10 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string,
}
// Add the original coder to the coders map.
bundle[cID] = c
// If we don't know this coder, and it has no sub components,
// we must LP it, and we return the LP'd version.
// If we don't know this coder, we must LP it, and we return the LP'd version.
leaf := isLeafCoder(c)
if len(c.GetComponentCoderIds()) == 0 && !leaf {
knownComposite := isKnownCompositeCoder(c)
if !leaf && !knownComposite {
lpc := &pipepb.Coder{
Spec: &pipepb.FunctionSpec{
Urn: urns.CoderLengthPrefix,
Expand All @@ -136,15 +163,18 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string,
bundle[lpcID] = lpc
return lpcID, nil
}
// We know we have a composite, so if we count this as a leaf, move everything to
// the coders map.
// If it is a leaf, move its components (if any) to the coders map.
if leaf {
// Copy the components from the base.
for _, cc := range c.GetComponentCoderIds() {
bundle[cc] = base[cc]
}
return cID, nil
}

// Now we have a known composite.
// We may need to LP its components. If so, we make a new composite with the
// LP'd components.
var needNewComposite bool
var comps []string
for i, cc := range c.GetComponentCoderIds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,6 @@ def test_custom_window_type(self):
"Requires Prism to support Custom Window Coders." +
" https://github.com/apache/beam/issues/31921")

def test_pack_combiners(self):
raise unittest.SkipTest(
"Requires Prism to support coder:" +
" 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636")

def test_metrics(self):
super().test_metrics(check_bounded_trie=False)

Expand Down
Loading