diff --git a/CHANGES.md b/CHANGES.md index e98d05cdb711..e5125e643aed 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -82,6 +82,8 @@ * Initial support for AllowedLateness added. ([#33542](https://github.com/apache/beam/pull/33542)) * The Go SDK's inprocess Prism runner (AKA the Go SDK default runner) now supports non-loopback mode environment types. ([#33572](https://github.com/apache/beam/pull/33572)) * Support the Process Environment for execution in Prism ([#33651](https://github.com/apache/beam/pull/33651)) + * Support the AnyOf Environment for execution in Prism ([#33705](https://github.com/apache/beam/pull/33705)) + * This improves support for developing Xlang pipelines, when using a compatible cross language service. ## Breaking Changes diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 89a2edabdbbe..3239c76dfe1f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -23,6 +23,7 @@ import ( "log/slog" "os" "os/exec" + "slices" "time" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -46,16 +47,26 @@ import ( func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) error { logger := j.Logger.With(slog.String("envID", wk.Env)) - // TODO fix broken abstraction. - // We're starting a worker pool here, because that's the loopback environment. - // It's sort of a mess, largely because of loopback, which has - // a different flow from a provisioned docker container. e := j.Pipeline.GetComponents().GetEnvironments()[env] + + if e.GetUrn() == urns.EnvAnyOf { + // We've been given a choice! + ap := &pipepb.AnyOfEnvironmentPayload{} + if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ap); err != nil { + logger.Error("unmarshaling any environment payload", "error", err) + return err + } + e = selectAnyOfEnv(ap) + logger.Info("AnyEnv resolved", "selectedUrn", e.GetUrn(), "worker", wk.ID) + // Process the environment as normal. + } + switch e.GetUrn() { case urns.EnvExternal: ep := &pipepb.ExternalPayload{} if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ep); err != nil { - logger.Error("unmarshing external environment payload", "error", err) + logger.Error("unmarshaling external environment payload", "error", err) + return err } go func() { externalEnvironment(ctx, ep, wk) @@ -65,13 +76,15 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor case urns.EnvDocker: dp := &pipepb.DockerPayload{} if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), dp); err != nil { - logger.Error("unmarshing docker environment payload", "error", err) + logger.Error("unmarshaling docker environment payload", "error", err) + return err } return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint()) case urns.EnvProcess: pp := &pipepb.ProcessPayload{} if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), pp); err != nil { - logger.Error("unmarshing docker environment payload", "error", err) + logger.Error("unmarshaling process environment payload", "error", err) + return err } go func() { processEnvironment(ctx, pp, wk) @@ -83,6 +96,33 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor } } +func selectAnyOfEnv(ap *pipepb.AnyOfEnvironmentPayload) *pipepb.Environment { + // Prefer external, then process, then docker, unknown environments are 0. + ranks := map[string]int{ + urns.EnvDocker: 1, + urns.EnvProcess: 5, + urns.EnvExternal: 10, + } + + envs := ap.GetEnvironments() + + slices.SortStableFunc(envs, func(a, b *pipepb.Environment) int { + rankA := ranks[a.GetUrn()] + rankB := ranks[b.GetUrn()] + + // Reverse the comparison so our favourite is at the front + switch { + case rankA > rankB: + return -1 // Usually "greater than" would be 1 + case rankA < rankB: + return 1 + } + return 0 + }) + // Pick our favourite. + return envs[0] +} + func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *worker.W) { conn, err := grpc.Dial(ep.GetEndpoint().GetUrl(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments_test.go b/sdks/go/pkg/beam/runners/prism/internal/environments_test.go new file mode 100644 index 000000000000..368c514efd1b --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/environments_test.go @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "testing" + + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" +) + +func TestSelectAnyOf(t *testing.T) { + tests := []struct { + name, want string + wantTag string + envs []*pipepb.Environment + }{ + {name: "singleDefault", want: urns.EnvDefault, envs: []*pipepb.Environment{{Urn: urns.EnvDefault}}}, + {name: "singleDocker", want: urns.EnvDocker, envs: []*pipepb.Environment{{Urn: urns.EnvDocker}}}, + {name: "singleProcess", want: urns.EnvProcess, envs: []*pipepb.Environment{{Urn: urns.EnvProcess}}}, + {name: "singleExternal", want: urns.EnvExternal, envs: []*pipepb.Environment{{Urn: urns.EnvExternal}}}, + {name: "multiplePickExternal_1", want: urns.EnvExternal, envs: []*pipepb.Environment{{Urn: urns.EnvExternal}, {Urn: urns.EnvDocker}, {Urn: urns.EnvProcess}}}, + {name: "multiplePickExternal_2", want: urns.EnvExternal, envs: []*pipepb.Environment{{Urn: urns.EnvDocker}, {Urn: urns.EnvProcess}, {Urn: urns.EnvExternal}}}, + {name: "multiplePickProcess", want: urns.EnvProcess, envs: []*pipepb.Environment{{Urn: urns.EnvDocker}, {Urn: urns.EnvProcess}}}, + {name: "multiplePickDocker", want: urns.EnvDocker, envs: []*pipepb.Environment{{Urn: urns.EnvDefault}, {Urn: urns.EnvDocker}}}, + {name: "multiplePickFirstExternal", want: urns.EnvExternal, wantTag: "first", envs: []*pipepb.Environment{{Urn: urns.EnvExternal, Payload: []byte("first")}, {Urn: urns.EnvExternal, Payload: []byte("second")}}}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + selected := selectAnyOfEnv(&pipepb.AnyOfEnvironmentPayload{Environments: test.envs}) + if selected.GetUrn() != test.want { + t.Errorf("expected %v, got %v", test.want, selected.GetUrn()) + } + if got, want := string(selected.GetPayload()), test.wantTag; got != want { + t.Errorf("expected payload with tag %v, got %v", want, got) + } + }) + } + +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go index 170073b72419..5b9b272f5f91 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go +++ b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go @@ -147,4 +147,5 @@ var ( EnvProcess = envUrn(pipepb.StandardEnvironments_PROCESS) EnvExternal = envUrn(pipepb.StandardEnvironments_EXTERNAL) EnvDefault = envUrn(pipepb.StandardEnvironments_DEFAULT) + EnvAnyOf = envUrn(pipepb.StandardEnvironments_ANYOF) )