Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,21 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo

default:
// Composites can often have some unknown urn, permit those.
// Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1", as well as the deprecated "beam:transform:read:v1",
// but they are composites. Since we don't do anything special with the high level, we simply use their internal subgraph.
// Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1",
// as well as the deprecated "beam:transform:read:v1", but they are composites.
// We don't do anything special with these high level composites, but
// we may be dealing with their internal subgraph already, so we ignore this transform.
if len(t.GetSubtransforms()) > 0 {
continue
}
// But if not, fail.
// This may be an "empty" composite without subtransforms or a payload.
// These just do PCollection manipulation which is already represented in the Pipeline graph.
// Simply ignore the composite at this stage, since the runner does nothing with them.
if len(t.GetSpec().GetPayload()) == 0 {
continue
}
// Otherwise fail.
slog.Warn("unknown transform, with payload", "urn", urn, "name", t.GetUniqueName(), "payload", t.GetSpec().GetPayload())
check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "<doesn't exist>")
}
}
Expand Down