From 4f06f88fe0255af1bd7fc8203714767d10932909 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 27 Aug 2025 10:36:35 -0400 Subject: [PATCH 1/2] Fix segv when docker container is self-terminated --- sdks/go/pkg/beam/runners/prism/internal/environments.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 971bb4f83cfa..b1034eae150f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -273,6 +273,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock rc, err := cli.ContainerLogs(bgctx, containerID, container.LogsOptions{Details: true, ShowStdout: true, ShowStderr: true}) if err != nil { logger.Error("docker container logs error", "error", err) + return } defer rc.Close() var buf bytes.Buffer From f30708044d7c45e55006ed594ee036cce4f938f6 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 27 Aug 2025 10:38:48 -0400 Subject: [PATCH 2/2] Add some debug logging for docker and process env. --- .../runners/prism/internal/environments.go | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index b1034eae150f..d18cc3b83732 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -87,7 +87,7 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor return err } go func() { - processEnvironment(ctx, pp, wk) + processEnvironment(ctx, logger, pp, wk) logger.Debug("environment stopped", slog.String("job", j.String())) }() return nil @@ -207,17 +207,18 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock } logger.Debug("creating container", "envs", envs, "mounts", mounts) + cmd := []string{ + fmt.Sprintf("--id=%v", wk.ID), + fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()), + fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint), + fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()), + fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()), + } ccr, err := cli.ContainerCreate(ctx, &container.Config{ Image: dp.GetContainerImage(), - Cmd: []string{ - fmt.Sprintf("--id=%v", wk.ID), - fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()), - fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint), - fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()), - fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()), - }, - Env: envs, - Tty: false, + Cmd: cmd, + Env: envs, + Tty: false, }, &container.HostConfig{ NetworkMode: "host", Mounts: mounts, @@ -236,6 +237,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock } logger.Debug("container started") + logger.Debug("container start command", "cmd", cmd) // Start goroutine to wait on container state. go func() { @@ -285,8 +287,9 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock return nil } -func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk *worker.W) { - cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id="+wk.ID, "--provision_endpoint="+wk.Endpoint()) +func processEnvironment(ctx context.Context, logger *slog.Logger, pp *pipepb.ProcessPayload, wk *worker.W) { + cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id='"+wk.ID+"'", "--provision_endpoint="+wk.Endpoint()) + logger.Debug("starting process", "cmd", cmd.String()) cmd.WaitDelay = time.Millisecond * 100 cmd.Stderr = os.Stderr @@ -297,9 +300,12 @@ func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk *work cmd.Env = append(cmd.Environ(), fmt.Sprintf("%v=%v", k, v)) } if err := cmd.Start(); err != nil { + logger.Error("process failed to start", "error", err) return } // Job processing happens here, but orchestrated by other goroutines // This call blocks until the context is cancelled, or the command exits. - cmd.Wait() + if err := cmd.Wait(); err != nil { + logger.Error("process failed while running", "error", err) + } }