Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"golang.org/x/exp/slog"
)

// FromMonitoringInfos extracts metrics from monitored states and
Expand Down Expand Up @@ -139,7 +140,7 @@ func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
}
}
if len(errs) > 0 {
log.Printf("Warning: %v errors during metrics processing: %v\n", len(errs), errs)
slog.Debug("errors during metrics processing", "count", len(errs), "errors", errs)
}
return counters, distributions, gauges, msecs, pcols
}
Expand Down
199 changes: 199 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"bytes"
"context"
"fmt"
"io"
"os"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/slog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"

dtyp "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
dcli "github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
)

// TODO move environment handling to the worker package.

func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) error {
logger := slog.With(slog.String("envID", wk.Env))
// TODO fix broken abstraction.
// We're starting a worker pool here, because that's the loopback environment.
// It's sort of a mess, largely because of loopback, which has
// a different flow from a provisioned docker container.
e := j.Pipeline.GetComponents().GetEnvironments()[env]
switch e.GetUrn() {
case urns.EnvExternal:
ep := &pipepb.ExternalPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ep); err != nil {
logger.Error("unmarshing external environment payload", "error", err)
}
go func() {
externalEnvironment(ctx, ep, wk)
slog.Debug("environment stopped", slog.String("job", j.String()))
}()
return nil
case urns.EnvDocker:
dp := &pipepb.DockerPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), dp); err != nil {
logger.Error("unmarshing docker environment payload", "error", err)
}
return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint())
default:
return fmt.Errorf("environment %v with urn %v unimplemented", env, e.GetUrn())
}
}

func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *worker.W) {
conn, err := grpc.Dial(ep.GetEndpoint().GetUrl(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(fmt.Sprintf("unable to dial sdk worker %v: %v", ep.GetEndpoint().GetUrl(), err))
}
defer conn.Close()
pool := fnpb.NewBeamFnExternalWorkerPoolClient(conn)

endpoint := &pipepb.ApiServiceDescriptor{
Url: wk.Endpoint(),
}
pool.StartWorker(ctx, &fnpb.StartWorkerRequest{
WorkerId: wk.ID,
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
ArtifactEndpoint: endpoint,
ProvisionEndpoint: endpoint,
Params: ep.GetParams(),
})
// Job processing happens here, but orchestrated by other goroutines
// This goroutine blocks until the context is cancelled, signalling
// that the pool runner should stop the worker.
<-ctx.Done()

// Previous context cancelled so we need a new one
// for this request.
pool.StopWorker(context.Background(), &fnpb.StopWorkerRequest{
WorkerId: wk.ID,
})
}

func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.DockerPayload, wk *worker.W, artifactEndpoint string) error {
logger = logger.With("worker_id", wk.ID, "image", dp.GetContainerImage())

// TODO consider preserving client?
cli, err := dcli.NewClientWithOpts(dcli.FromEnv, dcli.WithAPIVersionNegotiation())
if err != nil {
return fmt.Errorf("couldn't connect to docker:%w", err)
}

// TODO abstract mounting cloud specific auths better.
const gcloudCredsEnv = "GOOGLE_APPLICATION_CREDENTIALS"
gcloudCredsFile, ok := os.LookupEnv(gcloudCredsEnv)
var mounts []mount.Mount
var envs []string
if ok {
_, err := os.Stat(gcloudCredsFile)
// File exists
if err == nil {
dockerGcloudCredsFile := "/docker_cred_file.json"
mounts = append(mounts, mount.Mount{
Type: "bind",
Source: gcloudCredsFile,
Target: dockerGcloudCredsFile,
})
credEnv := fmt.Sprintf("%v=%v", gcloudCredsEnv, dockerGcloudCredsFile)
envs = append(envs, credEnv)
}
}

if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil {
// Copy the output, but discard it so we can wait until the image pull is finished.
io.Copy(io.Discard, rc)
rc.Close()
} else {
logger.Warn("unable to pull image", "error", err)
}

ccr, err := cli.ContainerCreate(ctx, &container.Config{
Image: dp.GetContainerImage(),
Cmd: []string{
fmt.Sprintf("--id=%v-%v", wk.JobKey, wk.Env),
fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()),
fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint),
fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()),
fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()),
},
Env: envs,
Tty: false,
}, &container.HostConfig{
NetworkMode: "host",
Mounts: mounts,
}, nil, nil, "")
if err != nil {
cli.Close()
return fmt.Errorf("unable to create container image %v with docker for env %v, err: %w", dp.GetContainerImage(), wk.Env, err)
}
containerID := ccr.ID
logger = logger.With("container", containerID)

if err := cli.ContainerStart(ctx, containerID, dtyp.ContainerStartOptions{}); err != nil {
cli.Close()
return fmt.Errorf("unable to start container image %v with docker for env %v, err: %w", dp.GetContainerImage(), wk.Env, err)
}

// Start goroutine to wait on container state.
go func() {
defer cli.Close()

statusCh, errCh := cli.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
select {
case <-ctx.Done():
// Can't use command context, since it's already canceled here.
err := cli.ContainerKill(context.Background(), containerID, "")
if err != nil {
logger.Error("docker container kill error", "error", err)
}
case err := <-errCh:
if err != nil {
logger.Error("docker container wait error", "error", err)
}
case resp := <-statusCh:
logger.Info("docker container has self terminated", "status_code", resp.StatusCode)

rc, err := cli.ContainerLogs(ctx, containerID, dtyp.ContainerLogsOptions{Details: true, ShowStdout: true, ShowStderr: true})
if err != nil {
logger.Error("docker container logs error", "error", err)
}
defer rc.Close()
var buf bytes.Buffer
stdcopy.StdCopy(&buf, &buf, rc)
logger.Error("container self terminated", "log", buf.String())
}
}()

return nil
}
90 changes: 24 additions & 66 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
)

Expand All @@ -54,30 +51,21 @@ func RunPipeline(j *jobservices.Job) {
return
}
env, _ := getOnlyPair(envs)
wk := worker.New(j.String()+"_"+env, env) // Cheating by having the worker id match the environment id.
go wk.Serve()
timeout := time.Minute
time.AfterFunc(timeout, func() {
if wk.Connected() {
return
}
err := fmt.Errorf("prism %v didn't get control connection after %v", wk, timeout)
wk, err := makeWorker(env, j)
if err != nil {
j.Failed(err)
j.CancelFn(err)
})

return
}
// When this function exits, we cancel the context to clear
// any related job resources.
defer func() {
j.CancelFn(fmt.Errorf("runPipeline returned, cleaning up"))
}()
go runEnvironment(j.RootCtx, j, env, wk)

j.SendMsg("running " + j.String())
j.Running()

err := executePipeline(j.RootCtx, wk, j)
if err != nil {
if err := executePipeline(j.RootCtx, wk, j); err != nil {
j.Failed(err)
return
}
Expand All @@ -90,57 +78,27 @@ func RunPipeline(j *jobservices.Job) {
j.Done()
}

// TODO move environment handling to the worker package.

func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) {
// TODO fix broken abstraction.
// We're starting a worker pool here, because that's the loopback environment.
// It's sort of a mess, largely because of loopback, which has
// a different flow from a provisioned docker container.
e := j.Pipeline.GetComponents().GetEnvironments()[env]
switch e.GetUrn() {
case urns.EnvExternal:
ep := &pipepb.ExternalPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ep); err != nil {
slog.Error("unmarshing environment payload", err, slog.String("envID", wk.Env))
}
externalEnvironment(ctx, ep, wk)
slog.Debug("environment stopped", slog.String("envID", wk.String()), slog.String("job", j.String()))
default:
panic(fmt.Sprintf("environment %v with urn %v unimplemented", env, e.GetUrn()))
}
}

func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *worker.W) {
conn, err := grpc.Dial(ep.GetEndpoint().GetUrl(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(fmt.Sprintf("unable to dial sdk worker %v: %v", ep.GetEndpoint().GetUrl(), err))
}
defer conn.Close()
pool := fnpb.NewBeamFnExternalWorkerPoolClient(conn)

endpoint := &pipepb.ApiServiceDescriptor{
Url: wk.Endpoint(),
// makeWorker creates a worker for that environment.
func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
wk := worker.New(j.String()+"_"+env, env)
wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
wk.JobKey = j.JobKey()
wk.ArtifactEndpoint = j.ArtifactEndpoint()
go wk.Serve()
if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
return nil, fmt.Errorf("failed to start environment %v for job %v: %w", env, j, err)
}
pool.StartWorker(ctx, &fnpb.StartWorkerRequest{
WorkerId: wk.ID,
ControlEndpoint: endpoint,
LoggingEndpoint: endpoint,
ArtifactEndpoint: endpoint,
ProvisionEndpoint: endpoint,
Params: nil,
})

// Job processing happens here, but orchestrated by other goroutines
// This goroutine blocks until the context is cancelled, signalling
// that the pool runner should stop the worker.
<-ctx.Done()

// Previous context cancelled so we need a new one
// for this request.
pool.StopWorker(context.Background(), &fnpb.StopWorkerRequest{
WorkerId: wk.ID,
// Check for connection succeeding after we've created the environment successfully.
timeout := 1 * time.Minute
time.AfterFunc(timeout, func() {
if wk.Connected() {
return
}
err := fmt.Errorf("prism %v didn't get control connection to %v after %v", wk, wk.Endpoint(), timeout)
j.Failed(err)
j.CancelFn(err)
})
return wk, nil
}

type transformExecuter interface {
Expand Down
Loading