From a1bbdb54b6f303e18cedfad64eaf0d14c123a672 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 13 Sep 2023 10:32:45 -0700 Subject: [PATCH 01/10] Move envs to own file. Err, not panic. --- .../runners/prism/internal/environments.go | 100 ++++++++++++++++++ .../beam/runners/prism/internal/execute.go | 86 ++++----------- 2 files changed, 120 insertions(+), 66 deletions(-) create mode 100644 sdks/go/pkg/beam/runners/prism/internal/environments.go diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go new file mode 100644 index 000000000000..ec56346b1b04 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "fmt" + + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" + "golang.org/x/exp/slog" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" +) + +// TODO move environment handling to the worker package. + +func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) error { + // TODO fix broken abstraction. + // We're starting a worker pool here, because that's the loopback environment. + // It's sort of a mess, largely because of loopback, which has + // a different flow from a provisioned docker container. + e := j.Pipeline.GetComponents().GetEnvironments()[env] + switch e.GetUrn() { + case urns.EnvExternal: + ep := &pipepb.ExternalPayload{} + if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ep); err != nil { + slog.Error("unmarshing external environment payload", err, slog.String("envID", wk.Env)) + } + go func() { + externalEnvironment(ctx, ep, wk) + slog.Debug("environment stopped", slog.String("envID", wk.String()), slog.String("job", j.String())) + }() + return nil + case urns.EnvDocker: + dp := &pipepb.DockerPayload{} + if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), dp); err != nil { + slog.Error("unmarshing docker environment payload", err, slog.String("envID", wk.Env)) + } + + // TODO: Check if the image exists. + // TODO: Download if the image doesn't exist. + // TODO: Ensure artifact server downloads to a consistent cache, and doesn't re-download artifacts. + // Ensure Go worker rebuilds are consistent? + // TODO: Fail sensibly when the image can't be downloaded or started without process crash. + return fmt.Errorf("environment %v with urn %v unimplemented:e\n%v payload\n%v ", env, e.GetUrn(), prototext.Format(e), prototext.Format(dp)) + default: + return fmt.Errorf("environment %v with urn %v unimplemented", env, e.GetUrn()) + } +} + +func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *worker.W) { + conn, err := grpc.Dial(ep.GetEndpoint().GetUrl(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + panic(fmt.Sprintf("unable to dial sdk worker %v: %v", ep.GetEndpoint().GetUrl(), err)) + } + defer conn.Close() + pool := fnpb.NewBeamFnExternalWorkerPoolClient(conn) + + endpoint := &pipepb.ApiServiceDescriptor{ + Url: wk.Endpoint(), + } + pool.StartWorker(ctx, &fnpb.StartWorkerRequest{ + WorkerId: wk.ID, + ControlEndpoint: endpoint, + LoggingEndpoint: endpoint, + ArtifactEndpoint: endpoint, + ProvisionEndpoint: endpoint, + Params: nil, + }) + + // Job processing happens here, but orchestrated by other goroutines + // This goroutine blocks until the context is cancelled, signalling + // that the pool runner should stop the worker. + <-ctx.Done() + + // Previous context cancelled so we need a new one + // for this request. + pool.StopWorker(context.Background(), &fnpb.StopWorkerRequest{ + WorkerId: wk.ID, + }) +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index b2f9d866603a..38294517fc7e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -24,7 +24,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" - fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices" @@ -32,8 +31,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "golang.org/x/exp/maps" "golang.org/x/exp/slog" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" ) @@ -54,30 +51,21 @@ func RunPipeline(j *jobservices.Job) { return } env, _ := getOnlyPair(envs) - wk := worker.New(j.String()+"_"+env, env) // Cheating by having the worker id match the environment id. - go wk.Serve() - timeout := time.Minute - time.AfterFunc(timeout, func() { - if wk.Connected() { - return - } - err := fmt.Errorf("prism %v didn't get control connection after %v", wk, timeout) + wk, err := makeWorker(env, j) + if err != nil { j.Failed(err) - j.CancelFn(err) - }) - + return + } // When this function exits, we cancel the context to clear // any related job resources. defer func() { j.CancelFn(fmt.Errorf("runPipeline returned, cleaning up")) }() - go runEnvironment(j.RootCtx, j, env, wk) j.SendMsg("running " + j.String()) j.Running() - err := executePipeline(j.RootCtx, wk, j) - if err != nil { + if err := executePipeline(j.RootCtx, wk, j); err != nil { j.Failed(err) return } @@ -90,57 +78,23 @@ func RunPipeline(j *jobservices.Job) { j.Done() } -// TODO move environment handling to the worker package. - -func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) { - // TODO fix broken abstraction. - // We're starting a worker pool here, because that's the loopback environment. - // It's sort of a mess, largely because of loopback, which has - // a different flow from a provisioned docker container. - e := j.Pipeline.GetComponents().GetEnvironments()[env] - switch e.GetUrn() { - case urns.EnvExternal: - ep := &pipepb.ExternalPayload{} - if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ep); err != nil { - slog.Error("unmarshing environment payload", err, slog.String("envID", wk.Env)) +// makeWorker creates a worker for that environment. +func makeWorker(env string, j *jobservices.Job) (*worker.W, error) { + wk := worker.New(j.String()+"_"+env, env) // Cheating by having the worker id match the environment id. + go wk.Serve() + timeout := time.Minute + time.AfterFunc(timeout, func() { + if wk.Connected() { + return } - externalEnvironment(ctx, ep, wk) - slog.Debug("environment stopped", slog.String("envID", wk.String()), slog.String("job", j.String())) - default: - panic(fmt.Sprintf("environment %v with urn %v unimplemented", env, e.GetUrn())) - } -} - -func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *worker.W) { - conn, err := grpc.Dial(ep.GetEndpoint().GetUrl(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - panic(fmt.Sprintf("unable to dial sdk worker %v: %v", ep.GetEndpoint().GetUrl(), err)) - } - defer conn.Close() - pool := fnpb.NewBeamFnExternalWorkerPoolClient(conn) - - endpoint := &pipepb.ApiServiceDescriptor{ - Url: wk.Endpoint(), - } - pool.StartWorker(ctx, &fnpb.StartWorkerRequest{ - WorkerId: wk.ID, - ControlEndpoint: endpoint, - LoggingEndpoint: endpoint, - ArtifactEndpoint: endpoint, - ProvisionEndpoint: endpoint, - Params: nil, - }) - - // Job processing happens here, but orchestrated by other goroutines - // This goroutine blocks until the context is cancelled, signalling - // that the pool runner should stop the worker. - <-ctx.Done() - - // Previous context cancelled so we need a new one - // for this request. - pool.StopWorker(context.Background(), &fnpb.StopWorkerRequest{ - WorkerId: wk.ID, + err := fmt.Errorf("prism %v didn't get control connection after %v", wk, timeout) + j.Failed(err) + j.CancelFn(err) }) + if err := runEnvironment(j.RootCtx, j, env, wk); err != nil { + return nil, fmt.Errorf("failed to start environment %v for job %v: %w", env, j, err) + } + return wk, nil } type transformExecuter interface { From 95a3a6a8d7a6ce25c58220de9e9a40e2dc09d82c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 13 Sep 2023 17:32:41 -0700 Subject: [PATCH 02/10] artifact and docker finagling --- .../runners/prism/internal/environments.go | 111 ++++++++++++++++-- .../beam/runners/prism/internal/execute.go | 9 +- .../prism/internal/jobservices/artifact.go | 50 +++++++- .../runners/prism/internal/jobservices/job.go | 10 ++ .../prism/internal/jobservices/management.go | 15 +-- .../prism/internal/jobservices/server.go | 5 + .../prism/internal/worker/bundle_test.go | 2 +- .../runners/prism/internal/worker/worker.go | 22 +++- .../prism/internal/worker/worker_test.go | 10 +- .../beam/runners/universal/runnerlib/stage.go | 2 +- 10 files changed, 196 insertions(+), 40 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index ec56346b1b04..0578afecad4f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -16,8 +16,13 @@ package internal import ( + "bytes" "context" "fmt" + "os" + "os/exec" + "strings" + "time" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" @@ -27,13 +32,13 @@ import ( "golang.org/x/exp/slog" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" ) // TODO move environment handling to the worker package. func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) error { + logger := slog.With(slog.String("envID", wk.Env)) // TODO fix broken abstraction. // We're starting a worker pool here, because that's the loopback environment. // It's sort of a mess, largely because of loopback, which has @@ -43,25 +48,19 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor case urns.EnvExternal: ep := &pipepb.ExternalPayload{} if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ep); err != nil { - slog.Error("unmarshing external environment payload", err, slog.String("envID", wk.Env)) + logger.Error("unmarshing external environment payload", "error", err) } go func() { externalEnvironment(ctx, ep, wk) - slog.Debug("environment stopped", slog.String("envID", wk.String()), slog.String("job", j.String())) + slog.Debug("environment stopped", slog.String("job", j.String())) }() return nil case urns.EnvDocker: dp := &pipepb.DockerPayload{} if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), dp); err != nil { - slog.Error("unmarshing docker environment payload", err, slog.String("envID", wk.Env)) + logger.Error("unmarshing docker environment payload", "error", err) } - - // TODO: Check if the image exists. - // TODO: Download if the image doesn't exist. - // TODO: Ensure artifact server downloads to a consistent cache, and doesn't re-download artifacts. - // Ensure Go worker rebuilds are consistent? - // TODO: Fail sensibly when the image can't be downloaded or started without process crash. - return fmt.Errorf("environment %v with urn %v unimplemented:e\n%v payload\n%v ", env, e.GetUrn(), prototext.Format(e), prototext.Format(dp)) + return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint()) default: return fmt.Errorf("environment %v with urn %v unimplemented", env, e.GetUrn()) } @@ -84,9 +83,8 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo LoggingEndpoint: endpoint, ArtifactEndpoint: endpoint, ProvisionEndpoint: endpoint, - Params: nil, + Params: ep.GetParams(), }) - // Job processing happens here, but orchestrated by other goroutines // This goroutine blocks until the context is cancelled, signalling // that the pool runner should stop the worker. @@ -98,3 +96,90 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo WorkerId: wk.ID, }) } + +func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.DockerPayload, wk *worker.W, artifactEndpoint string) error { + logger = logger.With("worker_id", wk.ID, "image", dp.GetContainerImage()) + // TODO: Ensure artifact server downloads to a consistent cache, and doesn't re-download artifacts. + // Ensure Go worker rebuilds are consistent? + // TODO: Fail sensibly when the image can't be downloaded or started without process crash. + + var credentialArgs []string + // TODO better abstract cloud specific auths. + const gcloudCredsEnv = "GOOGLE_APPLICATION_CREDENTIALS" + gcloudCredsFile, ok := os.LookupEnv(gcloudCredsEnv) + if ok { + _, err := os.Stat(gcloudCredsFile) + // File exists + if err == nil { + dockerGcloudCredsFile := "/docker_cred_file.json" + credentialArgs = append(credentialArgs, + "--mount", + fmt.Sprintf("type=bind,source=%v,target=%v", gcloudCredsFile, dockerGcloudCredsFile), + "--env", + fmt.Sprintf("%v=%v", gcloudCredsEnv, dockerGcloudCredsFile), + ) + } + } + + logger.Info("attempting to pull docker image for environment") + pullCmd := exec.CommandContext(ctx, "docker", "pull", dp.GetContainerImage()) + pullCmd.Start() + pullCmd.Wait() + + runArgs := []string{"run", "-d", "--network=host"} + runArgs = append(runArgs, credentialArgs...) + runArgs = append(runArgs, + dp.GetContainerImage(), + fmt.Sprintf("--id=%v-%v", wk.JobKey, wk.Env), + fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()), + fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint), + fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()), + fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()), + ) + + runCmd := exec.CommandContext(ctx, "docker", runArgs...) + slog.Info("docker run command", "run_cmd", runCmd.String()) + var buf bytes.Buffer + runCmd.Stdout = &buf + if err := runCmd.Start(); err != nil { + return fmt.Errorf("unable to start container image %v with docker for env %v", dp.GetContainerImage(), wk.Env) + } + if err := runCmd.Wait(); err != nil { + return fmt.Errorf("docker run failed for image %v with docker for env %v", dp.GetContainerImage(), wk.Env) + } + + containerID := strings.TrimSpace(buf.String()) + if containerID == "" { + return fmt.Errorf("docker run failed for image %v with docker for env %v - no container ID", dp.GetContainerImage(), wk.Env) + } + logger.Info("docker container is started", "container_id", containerID) + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for range ticker.C { + buf.Reset() + inspectCmd := exec.CommandContext(ctx, "docker", "inspect", "-f", "{{.State.Status}}", containerID) + inspectCmd.Stdout = &buf + inspectCmd.Start() + inspectCmd.Wait() + + status := strings.TrimSpace(buf.String()) + switch status { + case "running": + logger.Info("docker container is running", "container_id", containerID) + return nil + case "dead", "exited": + logDumpCmd := exec.CommandContext(ctx, "docker", "container", "logs", containerID) + buf.Reset() + logDumpCmd.Stdout = &buf + logDumpCmd.Stderr = &buf + logDumpCmd.Start() + logDumpCmd.Wait() + logger.Error("SDK failed to start.", "final_status", status, "log", buf.String()) + return fmt.Errorf("docker run failed for image %v with docker for env %v - containerID %v - log:\n%v", dp.GetContainerImage(), wk.Env, containerID, buf.String()) + default: + logger.Info("docker container status", "container_id", containerID, "status", status) + } + } + return nil +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 38294517fc7e..e97abba29808 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -80,14 +80,17 @@ func RunPipeline(j *jobservices.Job) { // makeWorker creates a worker for that environment. func makeWorker(env string, j *jobservices.Job) (*worker.W, error) { - wk := worker.New(j.String()+"_"+env, env) // Cheating by having the worker id match the environment id. + envPb := j.Pipeline.GetComponents().GetEnvironments()[env] + wk := worker.New(j.String()+"_"+env, env, envPb) // Cheating by having the worker id match the environment id. + wk.JobKey = j.JobKey() + wk.ArtifactEndpoint = j.ArtifactEndpoint() go wk.Serve() - timeout := time.Minute + timeout := 5 * time.Minute time.AfterFunc(timeout, func() { if wk.Connected() { return } - err := fmt.Errorf("prism %v didn't get control connection after %v", wk, timeout) + err := fmt.Errorf("prism %v didn't get control connection to %v after %v", wk, wk.Endpoint(), timeout) j.Failed(err) j.CancelFn(err) }) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go index e66def5b0fe8..c377681a7d64 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go @@ -16,11 +16,14 @@ package jobservices import ( + "bytes" + "context" "fmt" "io" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" "golang.org/x/exp/slog" + "google.golang.org/protobuf/encoding/prototext" ) func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingService_ReverseArtifactRetrievalServiceServer) error { @@ -47,7 +50,7 @@ func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingSer }, }, }) - var count int + var buf bytes.Buffer for { in, err := stream.Recv() if err == io.EOF { @@ -56,26 +59,63 @@ func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingSer if err != nil { return err } - if in.IsLast { - slog.Debug("GetArtifact finish", + if in.GetIsLast() { + slog.Info("GetArtifact finish", slog.Group("dep", slog.String("urn", dep.GetTypeUrn()), slog.String("payload", string(dep.GetTypePayload()))), - slog.Int("bytesReceived", count)) + slog.Int("bytesReceived", buf.Len()), + slog.String("rtype", fmt.Sprintf("%T", in.GetResponse())), + ) break } // Here's where we go through each environment's artifacts. // We do nothing with them. switch req := in.GetResponse().(type) { case *jobpb.ArtifactResponseWrapper_GetArtifactResponse: - count += len(req.GetArtifactResponse.GetData()) + buf.Write(req.GetArtifactResponse.GetData()) + case *jobpb.ArtifactResponseWrapper_ResolveArtifactResponse: err := fmt.Errorf("unexpected ResolveArtifactResponse to GetArtifact: %v", in.GetResponse()) slog.Error("GetArtifact failure", err) return err } } + if len(s.artifacts) == 0 { + s.artifacts = map[string][]byte{} + } + s.artifacts[string(dep.GetTypePayload())] = buf.Bytes() } } return nil } + +func (s *Server) ResolveArtifacts(_ context.Context, req *jobpb.ResolveArtifactsRequest) (*jobpb.ResolveArtifactsResponse, error) { + slog.Info("GetArtifact request received", "artifact", prototext.Format(req)) + return &jobpb.ResolveArtifactsResponse{ + Replacements: req.GetArtifacts(), + }, nil +} + +func (s *Server) GetArtifact(req *jobpb.GetArtifactRequest, stream jobpb.ArtifactRetrievalService_GetArtifactServer) error { + slog.Info("GetArtifact request received", "artifact", prototext.Format(req)) + info := req.GetArtifact() + buf, ok := s.artifacts[string(info.GetTypePayload())] + if !ok { + pt := prototext.Format(info) + slog.Warn("unable to provide artifact to worker", "artifact_info", pt) + return fmt.Errorf("unable to provide %v to worker", pt) + } + chunk := 4 * 1024 * 1024 // 4 MB + var i int + for i+chunk > len(buf) { + stream.Send(&jobpb.GetArtifactResponse{ + Data: buf[i : i+chunk], + }) + i += chunk + } + stream.Send(&jobpb.GetArtifactResponse{ + Data: buf[i:], + }) + return nil +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index 10d36066391f..87b0ec007bfb 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -68,6 +68,8 @@ type Job struct { key string jobName string + artifactEndpoint string + Pipeline *pipepb.Pipeline options *structpb.Struct @@ -88,6 +90,10 @@ type Job struct { metrics metricsStore } +func (j *Job) ArtifactEndpoint() string { + return j.artifactEndpoint +} + // ContributeTentativeMetrics returns the datachannel read index, and any unknown monitoring short ids. func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) { return j.metrics.ContributeTentativeMetrics(payloads) @@ -113,6 +119,10 @@ func (j *Job) LogValue() slog.Value { slog.String("name", j.jobName)) } +func (j *Job) JobKey() string { + return j.key +} + func (j *Job) SendMsg(msg string) { j.streamCond.L.Lock() defer j.streamCond.L.Unlock() diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index e626a05b51e1..720391c6ffbf 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -72,13 +72,14 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo // Since jobs execute in the background, they should not be tied to a request's context. rootCtx, cancelFn := context.WithCancelCause(context.Background()) job := &Job{ - key: s.nextId(), - Pipeline: req.GetPipeline(), - jobName: req.GetJobName(), - options: req.GetPipelineOptions(), - streamCond: sync.NewCond(&sync.Mutex{}), - RootCtx: rootCtx, - CancelFn: cancelFn, + key: s.nextId(), + artifactEndpoint: s.Endpoint(), + Pipeline: req.GetPipeline(), + jobName: req.GetJobName(), + options: req.GetPipelineOptions(), + streamCond: sync.NewCond(&sync.Mutex{}), + RootCtx: rootCtx, + CancelFn: cancelFn, } // Queue initial state of the job. diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go index e3fb7766b519..bf2db814813c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go @@ -29,6 +29,7 @@ import ( type Server struct { jobpb.UnimplementedJobServiceServer jobpb.UnimplementedArtifactStagingServiceServer + jobpb.UnimplementedArtifactRetrievalServiceServer fnpb.UnimplementedProvisionServiceServer // Server management @@ -42,6 +43,9 @@ type Server struct { // execute defines how a job is executed. execute func(*Job) + + // Artifact hack + artifacts map[string][]byte } // NewServer acquires the indicated port. @@ -60,6 +64,7 @@ func NewServer(port int, execute func(*Job)) *Server { s.server = grpc.NewServer(opts...) jobpb.RegisterJobServiceServer(s.server, s) jobpb.RegisterArtifactStagingServiceServer(s.server, s) + jobpb.RegisterArtifactRetrievalServiceServer(s.server, s) return s } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go index ba5b10f5fd39..874c7aa8040f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go @@ -23,7 +23,7 @@ import ( ) func TestBundle_ProcessOn(t *testing.T) { - wk := New("test", "testEnv") + wk := New("test", "testEnv", nil) b := &B{ InstID: "testInst", PBDID: "testPBDID", diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 0ad7ccb37032..05af5308ac65 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -57,6 +57,9 @@ type W struct { ID, Env string + JobKey, ArtifactEndpoint string + envPb *pipepb.Environment + // Server management lis net.Listener server *grpc.Server @@ -80,7 +83,7 @@ type controlResponder interface { } // New starts the worker server components of FnAPI Execution. -func New(id, env string) *W { +func New(id, env string, envPb *pipepb.Environment) *W { lis, err := net.Listen("tcp", ":0") if err != nil { panic(fmt.Sprintf("failed to listen: %v", err)) @@ -100,6 +103,8 @@ func New(id, env string) *W { activeInstructions: make(map[string]controlResponder), Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor), + envPb: envPb, + D: &DataService{}, } slog.Debug("Serving Worker components", slog.String("endpoint", wk.Endpoint())) @@ -107,6 +112,7 @@ func New(id, env string) *W { fnpb.RegisterBeamFnDataServer(wk.server, wk) fnpb.RegisterBeamFnLoggingServer(wk.server, wk) fnpb.RegisterBeamFnStateServer(wk.server, wk) + fnpb.RegisterProvisionServiceServer(wk.server, wk) return wk } @@ -154,6 +160,7 @@ func (wk *W) NextStage() string { var minsev = fnpb.LogEntry_Severity_DEBUG func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error) { + slog.Info("GetProvisionInfo", "worker", wk) endpoint := &pipepb.ApiServiceDescriptor{ Url: wk.Endpoint(), } @@ -164,10 +171,15 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest RunnerCapabilities: []string{ urns.CapabilityMonitoringInfoShortIDs, }, - LoggingEndpoint: endpoint, - ControlEndpoint: endpoint, - ArtifactEndpoint: endpoint, - // TODO add this job's RetrievalToken + LoggingEndpoint: endpoint, + ControlEndpoint: endpoint, + ArtifactEndpoint: &pipepb.ApiServiceDescriptor{ + Url: wk.ArtifactEndpoint, + }, + + RetrievalToken: wk.JobKey, + Dependencies: wk.envPb.GetDependencies(), + // TODO add this job's artifact Dependencies Metadata: map[string]string{ diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go index ed61f484481c..64be06bfa8da 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go @@ -32,14 +32,14 @@ import ( ) func TestWorker_New(t *testing.T) { - w := New("test", "testEnv") + w := New("test", "testEnv", nil) if got, want := w.ID, "test"; got != want { t.Errorf("New(%q) = %v, want %v", want, got, want) } } func TestWorker_NextInst(t *testing.T) { - w := New("test", "testEnv") + w := New("test", "testEnv", nil) instIDs := map[string]struct{}{} for i := 0; i < 100; i++ { @@ -51,7 +51,7 @@ func TestWorker_NextInst(t *testing.T) { } func TestWorker_NextStage(t *testing.T) { - w := New("test", "testEnv") + w := New("test", "testEnv", nil) stageIDs := map[string]struct{}{} for i := 0; i < 100; i++ { @@ -63,7 +63,7 @@ func TestWorker_NextStage(t *testing.T) { } func TestWorker_GetProcessBundleDescriptor(t *testing.T) { - w := New("test", "testEnv") + w := New("test", "testEnv", nil) id := "available" w.Descriptors[id] = &fnpb.ProcessBundleDescriptor{ @@ -93,7 +93,7 @@ func serveTestWorker(t *testing.T) (context.Context, *W, *grpc.ClientConn) { ctx, cancelFn := context.WithCancel(context.Background()) t.Cleanup(cancelFn) - w := New("test", "testEnv") + w := New("test", "testEnv", nil) lis := bufconn.Listen(2048) w.lis = lis t.Cleanup(func() { w.Stop() }) diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go index 732f4382ab5d..d5cc6aa7327a 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go @@ -44,7 +44,7 @@ func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken defer cc.Close() if err := StageViaPortableAPI(ctx, cc, binary, st); err == nil { - return "", nil + return st, nil } log.Warnf(ctx, "unable to stage with PortableAPI: %v; falling back to legacy", err) From 5b04e34b43761f9e1fbd9aa67babbabd9c18587c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 14 Sep 2023 15:37:17 -0700 Subject: [PATCH 03/10] kill containers on job completion. --- sdks/go/pkg/beam/runners/prism/internal/environments.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 0578afecad4f..05ac2279758b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -167,6 +167,14 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock switch status { case "running": logger.Info("docker container is running", "container_id", containerID) + // Shut down container on job termination. + go func() { + <-ctx.Done() + // Can't use command context, since it's already canceled here. + killCmd := exec.Command("docker", "kill", containerID) + killCmd.Start() + logger.Info("docker container is killed - job finished", "container_id", containerID) + }() return nil case "dead", "exited": logDumpCmd := exec.CommandContext(ctx, "docker", "container", "logs", containerID) From 95732d3e8de684d5529f8810ea69cb6fe47069d4 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:04:11 -0700 Subject: [PATCH 04/10] switch to docker sdk --- .../runners/prism/internal/environments.go | 155 +++++++++--------- 1 file changed, 80 insertions(+), 75 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 05ac2279758b..3517847905c2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -19,10 +19,8 @@ import ( "bytes" "context" "fmt" + "io" "os" - "os/exec" - "strings" - "time" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" @@ -33,6 +31,11 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" + + dtyp "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" + dcli "github.com/docker/docker/client" ) // TODO move environment handling to the worker package. @@ -99,95 +102,97 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.DockerPayload, wk *worker.W, artifactEndpoint string) error { logger = logger.With("worker_id", wk.ID, "image", dp.GetContainerImage()) - // TODO: Ensure artifact server downloads to a consistent cache, and doesn't re-download artifacts. - // Ensure Go worker rebuilds are consistent? - // TODO: Fail sensibly when the image can't be downloaded or started without process crash. - var credentialArgs []string + // TODO consider preserving client? + cli, err := dcli.NewClientWithOpts(dcli.FromEnv) + if err != nil { + return fmt.Errorf("couldn't connect to docker:%w", err) + } + // TODO better abstract cloud specific auths. const gcloudCredsEnv = "GOOGLE_APPLICATION_CREDENTIALS" gcloudCredsFile, ok := os.LookupEnv(gcloudCredsEnv) + var mounts []mount.Mount + var envs []string if ok { _, err := os.Stat(gcloudCredsFile) // File exists if err == nil { dockerGcloudCredsFile := "/docker_cred_file.json" - credentialArgs = append(credentialArgs, - "--mount", - fmt.Sprintf("type=bind,source=%v,target=%v", gcloudCredsFile, dockerGcloudCredsFile), - "--env", - fmt.Sprintf("%v=%v", gcloudCredsEnv, dockerGcloudCredsFile), - ) + mounts = append(mounts, mount.Mount{ + Type: "bind", + Source: gcloudCredsFile, + Target: dockerGcloudCredsFile, + }) + credEnv := fmt.Sprintf("%v=%v", gcloudCredsEnv, dockerGcloudCredsFile) + envs = append(envs, credEnv) } } logger.Info("attempting to pull docker image for environment") - pullCmd := exec.CommandContext(ctx, "docker", "pull", dp.GetContainerImage()) - pullCmd.Start() - pullCmd.Wait() - - runArgs := []string{"run", "-d", "--network=host"} - runArgs = append(runArgs, credentialArgs...) - runArgs = append(runArgs, - dp.GetContainerImage(), - fmt.Sprintf("--id=%v-%v", wk.JobKey, wk.Env), - fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()), - fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint), - fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()), - fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()), - ) - - runCmd := exec.CommandContext(ctx, "docker", runArgs...) - slog.Info("docker run command", "run_cmd", runCmd.String()) - var buf bytes.Buffer - runCmd.Stdout = &buf - if err := runCmd.Start(); err != nil { - return fmt.Errorf("unable to start container image %v with docker for env %v", dp.GetContainerImage(), wk.Env) + + if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil { + rc.Close() } - if err := runCmd.Wait(); err != nil { - return fmt.Errorf("docker run failed for image %v with docker for env %v", dp.GetContainerImage(), wk.Env) + + ccr, err := cli.ContainerCreate(ctx, &container.Config{ + Image: dp.GetContainerImage(), + Cmd: []string{ + fmt.Sprintf("--id=%v-%v", wk.JobKey, wk.Env), + fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()), + fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint), + fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()), + fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()), + }, + Env: envs, + Tty: false, + }, &container.HostConfig{ + NetworkMode: "host", + Mounts: mounts, + }, nil, nil, "") + if err != nil { + cli.Close() + return fmt.Errorf("unable to create container image %v with docker for env %v, err: %w", dp.GetContainerImage(), wk.Env, err) } + containerID := ccr.ID + logger = logger.With("container", containerID) - containerID := strings.TrimSpace(buf.String()) - if containerID == "" { - return fmt.Errorf("docker run failed for image %v with docker for env %v - no container ID", dp.GetContainerImage(), wk.Env) + if err := cli.ContainerStart(ctx, containerID, dtyp.ContainerStartOptions{}); err != nil { + cli.Close() + return fmt.Errorf("unable to start container image %v with docker for env %v, err: %w", dp.GetContainerImage(), wk.Env, err) } - logger.Info("docker container is started", "container_id", containerID) - - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for range ticker.C { - buf.Reset() - inspectCmd := exec.CommandContext(ctx, "docker", "inspect", "-f", "{{.State.Status}}", containerID) - inspectCmd.Stdout = &buf - inspectCmd.Start() - inspectCmd.Wait() - - status := strings.TrimSpace(buf.String()) - switch status { - case "running": - logger.Info("docker container is running", "container_id", containerID) - // Shut down container on job termination. - go func() { - <-ctx.Done() - // Can't use command context, since it's already canceled here. - killCmd := exec.Command("docker", "kill", containerID) - killCmd.Start() - logger.Info("docker container is killed - job finished", "container_id", containerID) - }() - return nil - case "dead", "exited": - logDumpCmd := exec.CommandContext(ctx, "docker", "container", "logs", containerID) - buf.Reset() - logDumpCmd.Stdout = &buf - logDumpCmd.Stderr = &buf - logDumpCmd.Start() - logDumpCmd.Wait() - logger.Error("SDK failed to start.", "final_status", status, "log", buf.String()) - return fmt.Errorf("docker run failed for image %v with docker for env %v - containerID %v - log:\n%v", dp.GetContainerImage(), wk.Env, containerID, buf.String()) - default: - logger.Info("docker container status", "container_id", containerID, "status", status) + logger.Info("docker container is started") + + // Start goroutine to wait on container state. + go func() { + defer cli.Close() + + statusCh, errCh := cli.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) + select { + case <-ctx.Done(): + // Can't use command context, since it's already canceled here. + err := cli.ContainerKill(context.Background(), containerID, "") + if err != nil { + logger.Error("docker container kill error", "error", err) + } + case err := <-errCh: + if err != nil { + logger.Error("docker container wait error", "error", err) + } + case resp := <-statusCh: + logger.Info("docker container has self terminated", "status_code", resp.StatusCode) + + rc, err := cli.ContainerLogs(ctx, containerID, dtyp.ContainerLogsOptions{ShowStdout: true, ShowStderr: true}) + if err != nil { + logger.Error("docker container logs error", "error", err) + } + defer rc.Close() + + var buf bytes.Buffer + io.Copy(&buf, rc) + logger.Error("container self terminated.", "log", buf.String()) } - } + }() + return nil } From f81c60b2b65fb47e4b479ac74bd3e224a542d768 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:25:47 -0700 Subject: [PATCH 05/10] tiny cleanup, log copy propperly --- .../beam/runners/prism/internal/environments.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 3517847905c2..189727a5635b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -19,7 +19,6 @@ import ( "bytes" "context" "fmt" - "io" "os" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -36,6 +35,7 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/mount" dcli "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" ) // TODO move environment handling to the worker package. @@ -109,7 +109,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock return fmt.Errorf("couldn't connect to docker:%w", err) } - // TODO better abstract cloud specific auths. + // TODO abstract mounting cloud specific auths better. const gcloudCredsEnv = "GOOGLE_APPLICATION_CREDENTIALS" gcloudCredsFile, ok := os.LookupEnv(gcloudCredsEnv) var mounts []mount.Mount @@ -129,8 +129,6 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock } } - logger.Info("attempting to pull docker image for environment") - if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil { rc.Close() } @@ -161,7 +159,6 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock cli.Close() return fmt.Errorf("unable to start container image %v with docker for env %v, err: %w", dp.GetContainerImage(), wk.Env, err) } - logger.Info("docker container is started") // Start goroutine to wait on container state. go func() { @@ -182,15 +179,14 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock case resp := <-statusCh: logger.Info("docker container has self terminated", "status_code", resp.StatusCode) - rc, err := cli.ContainerLogs(ctx, containerID, dtyp.ContainerLogsOptions{ShowStdout: true, ShowStderr: true}) + rc, err := cli.ContainerLogs(ctx, containerID, dtyp.ContainerLogsOptions{Details: true, ShowStdout: true, ShowStderr: true}) if err != nil { logger.Error("docker container logs error", "error", err) } defer rc.Close() - var buf bytes.Buffer - io.Copy(&buf, rc) - logger.Error("container self terminated.", "log", buf.String()) + stdcopy.StdCopy(&buf, &buf, rc) + logger.Error("container self terminated", "log", buf.String()) } }() From 6737f2b5353de1efd7fbc36441c62baa89b70412 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:50:50 -0700 Subject: [PATCH 06/10] Fix pulls. Reduce logging. --- sdks/go/pkg/beam/runners/prism/internal/environments.go | 7 ++++++- .../beam/runners/prism/internal/jobservices/artifact.go | 8 +++----- sdks/go/pkg/beam/runners/prism/internal/worker/worker.go | 1 - 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 189727a5635b..5830325bd054 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -19,6 +19,7 @@ import ( "bytes" "context" "fmt" + "io" "os" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -104,7 +105,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock logger = logger.With("worker_id", wk.ID, "image", dp.GetContainerImage()) // TODO consider preserving client? - cli, err := dcli.NewClientWithOpts(dcli.FromEnv) + cli, err := dcli.NewClientWithOpts(dcli.FromEnv, dcli.WithAPIVersionNegotiation()) if err != nil { return fmt.Errorf("couldn't connect to docker:%w", err) } @@ -130,7 +131,11 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock } if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil { + // Copy the output, but discard it so we can wait until the image pull is finished. + io.Copy(io.Discard, rc) rc.Close() + } else { + logger.Warn("unable to pull image", "error", err) } ccr, err := cli.ContainerCreate(ctx, &container.Config{ diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go index c377681a7d64..7b1790f9b20e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go @@ -60,7 +60,7 @@ func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingSer return err } if in.GetIsLast() { - slog.Info("GetArtifact finish", + slog.Debug("GetArtifact finished", slog.Group("dep", slog.String("urn", dep.GetTypeUrn()), slog.String("payload", string(dep.GetTypePayload()))), @@ -91,14 +91,12 @@ func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingSer } func (s *Server) ResolveArtifacts(_ context.Context, req *jobpb.ResolveArtifactsRequest) (*jobpb.ResolveArtifactsResponse, error) { - slog.Info("GetArtifact request received", "artifact", prototext.Format(req)) return &jobpb.ResolveArtifactsResponse{ Replacements: req.GetArtifacts(), }, nil } func (s *Server) GetArtifact(req *jobpb.GetArtifactRequest, stream jobpb.ArtifactRetrievalService_GetArtifactServer) error { - slog.Info("GetArtifact request received", "artifact", prototext.Format(req)) info := req.GetArtifact() buf, ok := s.artifacts[string(info.GetTypePayload())] if !ok { @@ -106,9 +104,9 @@ func (s *Server) GetArtifact(req *jobpb.GetArtifactRequest, stream jobpb.Artifac slog.Warn("unable to provide artifact to worker", "artifact_info", pt) return fmt.Errorf("unable to provide %v to worker", pt) } - chunk := 4 * 1024 * 1024 // 4 MB + chunk := 4 * 1024 * 1024 // 128 MB var i int - for i+chunk > len(buf) { + for i+chunk < len(buf) { stream.Send(&jobpb.GetArtifactResponse{ Data: buf[i : i+chunk], }) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 05af5308ac65..6c3e18d0d81f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -160,7 +160,6 @@ func (wk *W) NextStage() string { var minsev = fnpb.LogEntry_Severity_DEBUG func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error) { - slog.Info("GetProvisionInfo", "worker", wk) endpoint := &pipepb.ApiServiceDescriptor{ Url: wk.Endpoint(), } From 062df0ac206d157a9d0e567f4a52a950ce1cc3f7 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:53:09 -0700 Subject: [PATCH 07/10] Reduce metrics spam. --- sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go index 4a872e291c6a..c71ead208364 100644 --- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go +++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go @@ -24,6 +24,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "golang.org/x/exp/slog" ) // FromMonitoringInfos extracts metrics from monitored states and @@ -139,7 +140,7 @@ func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) ( } } if len(errs) > 0 { - log.Printf("Warning: %v errors during metrics processing: %v\n", len(errs), errs) + slog.Debug("errors during metrics processing", "count", len(errs), "errors", errs) } return counters, distributions, gauges, msecs, pcols } From 64dd8ffccd5d22f5011a119f48086da95f595c72 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:55:59 -0700 Subject: [PATCH 08/10] Minimize diff --- .../go/pkg/beam/runners/prism/internal/execute.go | 4 ++-- .../prism/internal/jobservices/management.go | 15 ++++++++------- .../runners/prism/internal/worker/bundle_test.go | 2 +- .../beam/runners/prism/internal/worker/worker.go | 8 +++----- .../runners/prism/internal/worker/worker_test.go | 10 +++++----- 5 files changed, 19 insertions(+), 20 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index e97abba29808..d7705a753241 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -80,8 +80,8 @@ func RunPipeline(j *jobservices.Job) { // makeWorker creates a worker for that environment. func makeWorker(env string, j *jobservices.Job) (*worker.W, error) { - envPb := j.Pipeline.GetComponents().GetEnvironments()[env] - wk := worker.New(j.String()+"_"+env, env, envPb) // Cheating by having the worker id match the environment id. + wk := worker.New(j.String()+"_"+env, env) + wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env] wk.JobKey = j.JobKey() wk.ArtifactEndpoint = j.ArtifactEndpoint() go wk.Serve() diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 720391c6ffbf..213e33a78379 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -72,14 +72,15 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo // Since jobs execute in the background, they should not be tied to a request's context. rootCtx, cancelFn := context.WithCancelCause(context.Background()) job := &Job{ - key: s.nextId(), + key: s.nextId(), + Pipeline: req.GetPipeline(), + jobName: req.GetJobName(), + options: req.GetPipelineOptions(), + streamCond: sync.NewCond(&sync.Mutex{}), + RootCtx: rootCtx, + CancelFn: cancelFn, + artifactEndpoint: s.Endpoint(), - Pipeline: req.GetPipeline(), - jobName: req.GetJobName(), - options: req.GetPipelineOptions(), - streamCond: sync.NewCond(&sync.Mutex{}), - RootCtx: rootCtx, - CancelFn: cancelFn, } // Queue initial state of the job. diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go index 874c7aa8040f..ba5b10f5fd39 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go @@ -23,7 +23,7 @@ import ( ) func TestBundle_ProcessOn(t *testing.T) { - wk := New("test", "testEnv", nil) + wk := New("test", "testEnv") b := &B{ InstID: "testInst", PBDID: "testPBDID", diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 6c3e18d0d81f..3a862a143b73 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -58,7 +58,7 @@ type W struct { ID, Env string JobKey, ArtifactEndpoint string - envPb *pipepb.Environment + EnvPb *pipepb.Environment // Server management lis net.Listener @@ -83,7 +83,7 @@ type controlResponder interface { } // New starts the worker server components of FnAPI Execution. -func New(id, env string, envPb *pipepb.Environment) *W { +func New(id, env string) *W { lis, err := net.Listen("tcp", ":0") if err != nil { panic(fmt.Sprintf("failed to listen: %v", err)) @@ -103,8 +103,6 @@ func New(id, env string, envPb *pipepb.Environment) *W { activeInstructions: make(map[string]controlResponder), Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor), - envPb: envPb, - D: &DataService{}, } slog.Debug("Serving Worker components", slog.String("endpoint", wk.Endpoint())) @@ -177,7 +175,7 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest }, RetrievalToken: wk.JobKey, - Dependencies: wk.envPb.GetDependencies(), + Dependencies: wk.EnvPb.GetDependencies(), // TODO add this job's artifact Dependencies diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go index 64be06bfa8da..ed61f484481c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go @@ -32,14 +32,14 @@ import ( ) func TestWorker_New(t *testing.T) { - w := New("test", "testEnv", nil) + w := New("test", "testEnv") if got, want := w.ID, "test"; got != want { t.Errorf("New(%q) = %v, want %v", want, got, want) } } func TestWorker_NextInst(t *testing.T) { - w := New("test", "testEnv", nil) + w := New("test", "testEnv") instIDs := map[string]struct{}{} for i := 0; i < 100; i++ { @@ -51,7 +51,7 @@ func TestWorker_NextInst(t *testing.T) { } func TestWorker_NextStage(t *testing.T) { - w := New("test", "testEnv", nil) + w := New("test", "testEnv") stageIDs := map[string]struct{}{} for i := 0; i < 100; i++ { @@ -63,7 +63,7 @@ func TestWorker_NextStage(t *testing.T) { } func TestWorker_GetProcessBundleDescriptor(t *testing.T) { - w := New("test", "testEnv", nil) + w := New("test", "testEnv") id := "available" w.Descriptors[id] = &fnpb.ProcessBundleDescriptor{ @@ -93,7 +93,7 @@ func serveTestWorker(t *testing.T) (context.Context, *W, *grpc.ClientConn) { ctx, cancelFn := context.WithCancel(context.Background()) t.Cleanup(cancelFn) - w := New("test", "testEnv", nil) + w := New("test", "testEnv") lis := bufconn.Listen(2048) w.lis = lis t.Cleanup(func() { w.Stop() }) From 1a9351f5b7731d22ec55401cd64cdd7a6d11be8e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:58:47 -0700 Subject: [PATCH 09/10] Check only after worker start --- sdks/go/pkg/beam/runners/prism/internal/execute.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index d7705a753241..e0c67105d451 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -85,7 +85,11 @@ func makeWorker(env string, j *jobservices.Job) (*worker.W, error) { wk.JobKey = j.JobKey() wk.ArtifactEndpoint = j.ArtifactEndpoint() go wk.Serve() - timeout := 5 * time.Minute + if err := runEnvironment(j.RootCtx, j, env, wk); err != nil { + return nil, fmt.Errorf("failed to start environment %v for job %v: %w", env, j, err) + } + // Check for connection succeeding after we've created the environment successfully. + timeout := 1 * time.Minute time.AfterFunc(timeout, func() { if wk.Connected() { return @@ -94,9 +98,6 @@ func makeWorker(env string, j *jobservices.Job) (*worker.W, error) { j.Failed(err) j.CancelFn(err) }) - if err := runEnvironment(j.RootCtx, j, env, wk); err != nil { - return nil, fmt.Errorf("failed to start environment %v for job %v: %w", env, j, err) - } return wk, nil } From 99514a6635d42f35c440c67a876fee36a4260cd0 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:02:34 -0700 Subject: [PATCH 10/10] Correct chunk size. --- sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go index 7b1790f9b20e..99b786d45980 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go @@ -104,7 +104,7 @@ func (s *Server) GetArtifact(req *jobpb.GetArtifactRequest, stream jobpb.Artifac slog.Warn("unable to provide artifact to worker", "artifact_info", pt) return fmt.Errorf("unable to provide %v to worker", pt) } - chunk := 4 * 1024 * 1024 // 128 MB + chunk := 128 * 1024 * 1024 // 128 MB var i int for i+chunk < len(buf) { stream.Send(&jobpb.GetArtifactResponse{