diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 971bb4f83cfa..d18cc3b83732 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -87,7 +87,7 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor return err } go func() { - processEnvironment(ctx, pp, wk) + processEnvironment(ctx, logger, pp, wk) logger.Debug("environment stopped", slog.String("job", j.String())) }() return nil @@ -207,17 +207,18 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock } logger.Debug("creating container", "envs", envs, "mounts", mounts) + cmd := []string{ + fmt.Sprintf("--id=%v", wk.ID), + fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()), + fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint), + fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()), + fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()), + } ccr, err := cli.ContainerCreate(ctx, &container.Config{ Image: dp.GetContainerImage(), - Cmd: []string{ - fmt.Sprintf("--id=%v", wk.ID), - fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()), - fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint), - fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()), - fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()), - }, - Env: envs, - Tty: false, + Cmd: cmd, + Env: envs, + Tty: false, }, &container.HostConfig{ NetworkMode: "host", Mounts: mounts, @@ -236,6 +237,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock } logger.Debug("container started") + logger.Debug("container start command", "cmd", cmd) // Start goroutine to wait on container state. go func() { @@ -273,6 +275,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock rc, err := cli.ContainerLogs(bgctx, containerID, container.LogsOptions{Details: true, ShowStdout: true, ShowStderr: true}) if err != nil { logger.Error("docker container logs error", "error", err) + return } defer rc.Close() var buf bytes.Buffer @@ -284,8 +287,9 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock return nil } -func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk *worker.W) { - cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id="+wk.ID, "--provision_endpoint="+wk.Endpoint()) +func processEnvironment(ctx context.Context, logger *slog.Logger, pp *pipepb.ProcessPayload, wk *worker.W) { + cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id='"+wk.ID+"'", "--provision_endpoint="+wk.Endpoint()) + logger.Debug("starting process", "cmd", cmd.String()) cmd.WaitDelay = time.Millisecond * 100 cmd.Stderr = os.Stderr @@ -296,9 +300,12 @@ func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk *work cmd.Env = append(cmd.Environ(), fmt.Sprintf("%v=%v", k, v)) } if err := cmd.Start(); err != nil { + logger.Error("process failed to start", "error", err) return } // Job processing happens here, but orchestrated by other goroutines // This call blocks until the context is cancelled, or the command exits. - cmd.Wait() + if err := cmd.Wait(); err != nil { + logger.Error("process failed while running", "error", err) + } }