From c6dc7516a3bff65598cd54febff1f593826ff1d4 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 Aug 2025 21:17:21 -0400 Subject: [PATCH] Override localhost endpoint when a worker is running in docker on mac --- .../runners/prism/internal/environments.go | 2 +- .../runners/prism/internal/jobservices/job.go | 2 +- .../runners/prism/internal/worker/worker.go | 35 ++++++++++++++++--- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 3239c76dfe1f..971bb4f83cfa 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -79,7 +79,7 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor logger.Error("unmarshaling docker environment payload", "error", err) return err } - return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint()) + return dockerEnvironment(ctx, logger, dp, wk, wk.ArtifactEndpoint) case urns.EnvProcess: pp := &pipepb.ProcessPayload{} if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), pp); err != nil { diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index f186b11fd1d8..ae0e3e73e860 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -208,7 +208,7 @@ func (j *Job) MakeWorker(env string) *worker.W { wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env] wk.PipelineOptions = j.PipelineOptions() wk.JobKey = j.JobKey() - wk.ArtifactEndpoint = j.ArtifactEndpoint() + wk.ResolveEndpoints(j.ArtifactEndpoint()) return wk } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index b4133b0332a6..1141a5b02304 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -24,6 +24,9 @@ import ( "io" "log/slog" "net" + "os" + "runtime" + "strings" "sync" "sync/atomic" @@ -58,9 +61,9 @@ type W struct { ID, Env string - JobKey, ArtifactEndpoint string - EnvPb *pipepb.Environment - PipelineOptions *structpb.Struct + JobKey, ArtifactEndpoint, endpoint string + EnvPb *pipepb.Environment + PipelineOptions *structpb.Struct // These are the ID sources inst uint64 @@ -79,8 +82,32 @@ type controlResponder interface { Respond(*fnpb.InstructionResponse) } +// resolveEndpoint checks if the worker is running inside a docker container on mac or Windows and +// if the endpoint is a "localhost" endpoint. If so, overrides it with "host.docker.internal". +// Reference: https://docs.docker.com/desktop/features/networking/#networking-mode-and-dns-behaviour-for-mac-and-windows +func (wk *W) resolveEndpoint(endpoint string) string { + // The presence of an external environment does not guarantee execution within + // Docker, as Python's LOOPBACK also runs in an external environment. + // A specific check for the "BEAM_WORKER_POOL_IN_DOCKER_VM" environment variable is required to confirm + // if the worker is running inside a Docker container. + // Python LOOPBACK mode: https://github.com/apache/beam/blob/0589b14812ec52bff9d20d3bfcd96da393b9ebdb/sdks/python/apache_beam/runners/portability/portable_runner.py#L397 + // External Environment: https://beam.apache.org/documentation/runtime/sdk-harness-config/ + + workerInDocker := wk.EnvPb.GetUrn() == urns.EnvDocker || + (wk.EnvPb.GetUrn() == urns.EnvExternal && (os.Getenv("BEAM_WORKER_POOL_IN_DOCKER_VM") == "1")) + if runtime.GOOS != "linux" && workerInDocker && strings.HasPrefix(endpoint, "localhost:") { + return "host.docker.internal:" + strings.TrimPrefix(endpoint, "localhost:") + } + return endpoint +} + +func (wk *W) ResolveEndpoints(artifactEndpoint string) { + wk.ArtifactEndpoint = wk.resolveEndpoint(artifactEndpoint) + wk.endpoint = wk.resolveEndpoint(wk.parentPool.endpoint) +} + func (wk *W) Endpoint() string { - return wk.parentPool.endpoint + return wk.endpoint } func (wk *W) String() string {