Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
logger.Error("unmarshaling docker environment payload", "error", err)
return err
}
return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint())
return dockerEnvironment(ctx, logger, dp, wk, wk.ArtifactEndpoint)
case urns.EnvProcess:
pp := &pipepb.ProcessPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), pp); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (j *Job) MakeWorker(env string) *worker.W {
wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
wk.PipelineOptions = j.PipelineOptions()
wk.JobKey = j.JobKey()
wk.ArtifactEndpoint = j.ArtifactEndpoint()

wk.ResolveEndpoints(j.ArtifactEndpoint())
return wk
}
35 changes: 31 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"io"
"log/slog"
"net"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -58,9 +61,9 @@ type W struct {

ID, Env string

JobKey, ArtifactEndpoint string
EnvPb *pipepb.Environment
PipelineOptions *structpb.Struct
JobKey, ArtifactEndpoint, endpoint string
EnvPb *pipepb.Environment
PipelineOptions *structpb.Struct

// These are the ID sources
inst uint64
Expand All @@ -79,8 +82,32 @@ type controlResponder interface {
Respond(*fnpb.InstructionResponse)
}

// resolveEndpoint checks if the worker is running inside a docker container on mac or Windows and
// if the endpoint is a "localhost" endpoint. If so, overrides it with "host.docker.internal".
// Reference: https://docs.docker.com/desktop/features/networking/#networking-mode-and-dns-behaviour-for-mac-and-windows
func (wk *W) resolveEndpoint(endpoint string) string {
// The presence of an external environment does not guarantee execution within
// Docker, as Python's LOOPBACK also runs in an external environment.
// A specific check for the "BEAM_WORKER_POOL_IN_DOCKER_VM" environment variable is required to confirm
// if the worker is running inside a Docker container.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha! The goal here is to avoid Docker In Docker issues, where the address may be weird because we're in a docker container at all. Makes sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Basically, if a worker is running inside a docker and we provide a localhost provisioning address, it is going to look at the docker's own port. In order to access the host port, we have to use a different host name.

That's only for mac or windows though. Docker running on linux does not have this issue.

// Python LOOPBACK mode: https://github.com/apache/beam/blob/0589b14812ec52bff9d20d3bfcd96da393b9ebdb/sdks/python/apache_beam/runners/portability/portable_runner.py#L397
// External Environment: https://beam.apache.org/documentation/runtime/sdk-harness-config/

workerInDocker := wk.EnvPb.GetUrn() == urns.EnvDocker ||
(wk.EnvPb.GetUrn() == urns.EnvExternal && (os.Getenv("BEAM_WORKER_POOL_IN_DOCKER_VM") == "1"))
if runtime.GOOS != "linux" && workerInDocker && strings.HasPrefix(endpoint, "localhost:") {
return "host.docker.internal:" + strings.TrimPrefix(endpoint, "localhost:")
}
return endpoint
}

func (wk *W) ResolveEndpoints(artifactEndpoint string) {
wk.ArtifactEndpoint = wk.resolveEndpoint(artifactEndpoint)
wk.endpoint = wk.resolveEndpoint(wk.parentPool.endpoint)
}

func (wk *W) Endpoint() string {
return wk.parentPool.endpoint
return wk.endpoint
}

func (wk *W) String() string {
Expand Down
Loading