Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
return err
}
go func() {
processEnvironment(ctx, pp, wk)
processEnvironment(ctx, logger, pp, wk)
logger.Debug("environment stopped", slog.String("job", j.String()))
}()
return nil
Expand Down Expand Up @@ -207,17 +207,18 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
}
logger.Debug("creating container", "envs", envs, "mounts", mounts)

cmd := []string{
fmt.Sprintf("--id=%v", wk.ID),
fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()),
fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint),
fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()),
fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()),
}
ccr, err := cli.ContainerCreate(ctx, &container.Config{
Image: dp.GetContainerImage(),
Cmd: []string{
fmt.Sprintf("--id=%v", wk.ID),
fmt.Sprintf("--control_endpoint=%v", wk.Endpoint()),
fmt.Sprintf("--artifact_endpoint=%v", artifactEndpoint),
fmt.Sprintf("--provision_endpoint=%v", wk.Endpoint()),
fmt.Sprintf("--logging_endpoint=%v", wk.Endpoint()),
},
Env: envs,
Tty: false,
Cmd: cmd,
Env: envs,
Tty: false,
}, &container.HostConfig{
NetworkMode: "host",
Mounts: mounts,
Expand All @@ -236,6 +237,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
}

logger.Debug("container started")
logger.Debug("container start command", "cmd", cmd)

// Start goroutine to wait on container state.
go func() {
Expand Down Expand Up @@ -273,6 +275,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
rc, err := cli.ContainerLogs(bgctx, containerID, container.LogsOptions{Details: true, ShowStdout: true, ShowStderr: true})
if err != nil {
logger.Error("docker container logs error", "error", err)
return
}
defer rc.Close()
Copy link
Collaborator Author

@shunping shunping Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the segv happens (at line 280) if we don't have the "return" above. Particularly, rc will be nil if an error is returned from cli.ContainerLogs.

var buf bytes.Buffer
Expand All @@ -284,8 +287,9 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
return nil
}

func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk *worker.W) {
cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id="+wk.ID, "--provision_endpoint="+wk.Endpoint())
func processEnvironment(ctx context.Context, logger *slog.Logger, pp *pipepb.ProcessPayload, wk *worker.W) {
cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id='"+wk.ID+"'", "--provision_endpoint="+wk.Endpoint())
logger.Debug("starting process", "cmd", cmd.String())

cmd.WaitDelay = time.Millisecond * 100
cmd.Stderr = os.Stderr
Expand All @@ -296,9 +300,12 @@ func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk *work
cmd.Env = append(cmd.Environ(), fmt.Sprintf("%v=%v", k, v))
}
if err := cmd.Start(); err != nil {
logger.Error("process failed to start", "error", err)
return
}
// Job processing happens here, but orchestrated by other goroutines
// This call blocks until the context is cancelled, or the command exits.
cmd.Wait()
if err := cmd.Wait(); err != nil {
logger.Error("process failed while running", "error", err)
}
}
Loading