From d8bb7bacd46a2a087c0d1aebc7d386db14462605 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 19 Sep 2025 14:48:21 -0400 Subject: [PATCH] Revert "Cleanly separate v1 worker and v2 sdk harness container image handling in DataflowRunner" This reverts commit 5e256087536787e14ec916d3603bc66ff22e0e8d. --- .../dataflow/DataflowPipelineTranslator.java | 7 +- .../beam/runners/dataflow/DataflowRunner.java | 127 +++++++----------- .../DataflowPipelineWorkerPoolOptions.java | 13 +- .../DataflowPipelineTranslatorTest.java | 12 +- .../runners/dataflow/DataflowRunnerTest.java | 50 ++++--- 5 files changed, 100 insertions(+), 109 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 50675a21eace..08d84705c5c7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -139,11 +139,10 @@ private static byte[] serializeWindowingStrategy( try { SdkComponents sdkComponents = SdkComponents.create(); - String v2SdkHarnessContainerImageURL = - DataflowRunner.getV2SdkHarnessContainerImageForJob( - options.as(DataflowPipelineOptions.class)); + String workerHarnessContainerImageURL = + DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); RunnerApi.Environment defaultEnvironmentForDataflow = - Environments.createDockerEnvironment(v2SdkHarnessContainerImageURL); + Environments.createDockerEnvironment(workerHarnessContainerImageURL); sdkComponents.registerEnvironment(defaultEnvironmentForDataflow); return WindowingStrategyTranslation.toMessageProto(windowingStrategy, sdkComponents) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 82d00dd4f144..d25a37e92dc3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -518,16 +518,29 @@ static boolean isServiceEndpoint(String endpoint) { } static void validateSdkContainerImageOptions(DataflowPipelineWorkerPoolOptions workerOptions) { - if (workerOptions.getSdkContainerImage() != null - && workerOptions.getWorkerHarnessContainerImage() != null) { + // Check against null - empty string value for workerHarnessContainerImage + // must be preserved for legacy dataflowWorkerJar to work. + String sdkContainerOption = workerOptions.getSdkContainerImage(); + String workerHarnessOption = workerOptions.getWorkerHarnessContainerImage(); + Preconditions.checkArgument( + sdkContainerOption == null + || workerHarnessOption == null + || sdkContainerOption.equals(workerHarnessOption), + "Cannot use legacy option workerHarnessContainerImage with sdkContainerImage. Prefer sdkContainerImage."); + + // Default to new option, which may be null. + String containerImage = workerOptions.getSdkContainerImage(); + if (workerOptions.getWorkerHarnessContainerImage() != null + && workerOptions.getSdkContainerImage() == null) { + // Set image to old option if old option was set but new option is not set. LOG.warn( - "Container specified for both --workerHarnessContainerImage and --sdkContainerImage. " - + "If you are a Beam of Dataflow developer, this could make sense, " - + "but otherwise may be a configuration error. " - + "The value of --workerHarnessContainerImage will be used only if the pipeline runs on Dataflow V1 " - + "and is *not* supported for end users. " - + "The value of --sdkContainerImage will be used only if the pipeline runs on Dataflow V2"); + "Prefer --sdkContainerImage over deprecated legacy option --workerHarnessContainerImage."); + containerImage = workerOptions.getWorkerHarnessContainerImage(); } + + // Make sure both options have same value. + workerOptions.setSdkContainerImage(containerImage); + workerOptions.setWorkerHarnessContainerImage(containerImage); } @VisibleForTesting @@ -1026,7 +1039,7 @@ protected RunnerApi.Pipeline applySdkEnvironmentOverrides( if (containerImage.startsWith("apache/beam") && !updated // don't update if the container image is already configured by DataflowRunner - && !containerImage.equals(getV2SdkHarnessContainerImageForJob(options))) { + && !containerImage.equals(getContainerImageForJob(options))) { containerImage = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerImageBaseRepository() + containerImage.substring(containerImage.lastIndexOf("/")); @@ -1277,19 +1290,21 @@ public DataflowPipelineJob run(Pipeline pipeline) { + "related to Google Compute Engine usage and other Google Cloud Services."); DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - String v1WorkerContainerImageURL = - DataflowRunner.getV1WorkerContainerImageForJob(dataflowOptions); - String v2SdkHarnessContainerImageURL = - DataflowRunner.getV2SdkHarnessContainerImageForJob(dataflowOptions); + String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions); - RunnerApi.Environment defaultEnvironmentForDataflowV2 = - Environments.createDockerEnvironment(v2SdkHarnessContainerImageURL); + // This incorrectly puns the worker harness container image (which implements v1beta3 API) + // with the SDK harness image (which implements Fn API). + // + // The same Environment is used in different and contradictory ways, depending on whether + // it is a v1 or v2 job submission. + RunnerApi.Environment defaultEnvironmentForDataflow = + Environments.createDockerEnvironment(workerHarnessContainerImageURL); // The SdkComponents for portable an non-portable job submission must be kept distinct. Both // need the default environment. SdkComponents portableComponents = SdkComponents.create(); portableComponents.registerEnvironment( - defaultEnvironmentForDataflowV2 + defaultEnvironmentForDataflow .toBuilder() .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) @@ -1328,7 +1343,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Capture the SdkComponents for look up during step translations SdkComponents dataflowV1Components = SdkComponents.create(); dataflowV1Components.registerEnvironment( - defaultEnvironmentForDataflowV2 + defaultEnvironmentForDataflow .toBuilder() .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) @@ -1454,7 +1469,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { // For runner_v1, only worker_harness_container is set. // For runner_v2, both worker_harness_container and sdk_harness_container are set to the same // value. - String containerImage = getV1WorkerContainerImageForJob(options); + String containerImage = getContainerImageForJob(options); for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) { workerPool.setWorkerHarnessContainerImage(containerImage); } @@ -2619,97 +2634,55 @@ public Map, ReplacementOutput> mapOutputs( } @VisibleForTesting - static String getV1WorkerContainerImageForJob(DataflowPipelineOptions options) { - String containerImage = options.getWorkerHarnessContainerImage(); - - if (containerImage == null) { - // If not set, construct and return default image URL. - return getDefaultV1WorkerContainerImageUrl(options); - } else if (containerImage.contains("IMAGE")) { - // Replace placeholder with default image name - return containerImage.replace("IMAGE", getDefaultV1WorkerContainerImageNameForJob(options)); - } else { - return containerImage; - } - } - - static String getV2SdkHarnessContainerImageForJob(DataflowPipelineOptions options) { + static String getContainerImageForJob(DataflowPipelineOptions options) { String containerImage = options.getSdkContainerImage(); if (containerImage == null) { // If not set, construct and return default image URL. - return getDefaultV2SdkHarnessContainerImageUrl(options); + return getDefaultContainerImageUrl(options); } else if (containerImage.contains("IMAGE")) { // Replace placeholder with default image name - return containerImage.replace("IMAGE", getDefaultV2SdkHarnessContainerImageNameForJob()); + return containerImage.replace("IMAGE", getDefaultContainerImageNameForJob(options)); } else { return containerImage; } } - /** Construct the default Dataflow worker container full image URL. */ - static String getDefaultV1WorkerContainerImageUrl(DataflowPipelineOptions options) { + /** Construct the default Dataflow container full image URL. */ + static String getDefaultContainerImageUrl(DataflowPipelineOptions options) { DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); return String.format( "%s/%s:%s", dataflowRunnerInfo.getContainerImageBaseRepository(), - getDefaultV1WorkerContainerImageNameForJob(options), - getDefaultV1WorkerContainerVersion(options)); - } - - /** Construct the default Java SDK container full image URL. */ - static String getDefaultV2SdkHarnessContainerImageUrl(DataflowPipelineOptions options) { - DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); - return String.format( - "%s/%s:%s", - dataflowRunnerInfo.getContainerImageBaseRepository(), - getDefaultV2SdkHarnessContainerImageNameForJob(), - getDefaultV2SdkHarnessContainerVersion(options)); + getDefaultContainerImageNameForJob(options), + getDefaultContainerVersion(options)); } /** - * Construct the default Dataflow V1 worker container image name based on pipeline type and Java - * version. + * Construct the default Dataflow container image name based on pipeline type and Java version. */ - static String getDefaultV1WorkerContainerImageNameForJob(DataflowPipelineOptions options) { + static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options) { Environments.JavaVersion javaVersion = Environments.getJavaVersion(); - if (options.isStreaming()) { + if (useUnifiedWorker(options)) { + return String.format("beam_%s_sdk", javaVersion.name()); + } else if (options.isStreaming()) { return String.format("beam-%s-streaming", javaVersion.legacyName()); } else { return String.format("beam-%s-batch", javaVersion.legacyName()); } } - /** - * Construct the default Java SDK container image name based on pipeline type and Java version, - * for use by Dataflow V2. - */ - static String getDefaultV2SdkHarnessContainerImageNameForJob() { - Environments.JavaVersion javaVersion = Environments.getJavaVersion(); - return String.format("beam_%s_sdk", javaVersion.name()); - } - - /** - * Construct the default Dataflow V1 worker container image name based on pipeline type and Java - * version. - */ - static String getDefaultV1WorkerContainerVersion(DataflowPipelineOptions options) { - DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); - ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo(); - if (releaseInfo.isDevSdkVersion()) { - return dataflowRunnerInfo.getLegacyDevContainerVersion(); - } - return releaseInfo.getSdkVersion(); - } - /** * Construct the default Dataflow container image name based on pipeline type and Java version. */ - static String getDefaultV2SdkHarnessContainerVersion(DataflowPipelineOptions options) { + static String getDefaultContainerVersion(DataflowPipelineOptions options) { DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo(); if (releaseInfo.isDevSdkVersion()) { - return dataflowRunnerInfo.getFnApiDevContainerVersion(); + if (useUnifiedWorker(options)) { + return dataflowRunnerInfo.getFnApiDevContainerVersion(); + } + return dataflowRunnerInfo.getLegacyDevContainerVersion(); } return releaseInfo.getSdkVersion(); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index 0d63b5ef245b..fd4af6d5e043 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -104,11 +104,17 @@ public String getAlgorithm() { void setDiskSizeGb(int value); /** Container image used as Dataflow worker harness image. */ + /** @deprecated Use {@link #getSdkContainerImage} instead. */ @Description( - "Container image to use for Dataflow V1 worker. Can only be used for official Dataflow container images. ") + "Container image used to configure a Dataflow worker. " + + "Can only be used for official Dataflow container images. " + + "Prefer using sdkContainerImage instead.") + @Deprecated @Hidden String getWorkerHarnessContainerImage(); + /** @deprecated Use {@link #setSdkContainerImage} instead. */ + @Deprecated @Hidden void setWorkerHarnessContainerImage(String value); @@ -116,7 +122,10 @@ public String getAlgorithm() { * Container image used to configure SDK execution environment on worker. Used for custom * containers on portable pipelines only. */ - @Description("Container image to use for Beam Java SDK execution environment on Dataflow V2.") + @Description( + "Container image used to configure the SDK execution environment of " + + "pipeline code on a worker. For non-portable pipelines, can only be " + + "used for official Dataflow container images.") String getSdkContainerImage(); void setSdkContainerImage(String value); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 208cdaf1140d..8226dc2c7274 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -156,8 +156,7 @@ private SdkComponents createSdkComponents(PipelineOptions options) { SdkComponents sdkComponents = SdkComponents.create(); String containerImageURL = - DataflowRunner.getV2SdkHarnessContainerImageForJob( - options.as(DataflowPipelineOptions.class)); + DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); RunnerApi.Environment defaultEnvironmentForDataflow = Environments.createDockerEnvironment(containerImageURL); @@ -1128,8 +1127,7 @@ public String apply(byte[] input) { file2.deleteOnExit(); SdkComponents sdkComponents = SdkComponents.create(); sdkComponents.registerEnvironment( - Environments.createDockerEnvironment( - DataflowRunner.getV2SdkHarnessContainerImageForJob(options)) + Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options)) .toBuilder() .addAllDependencies( Environments.getArtifacts( @@ -1591,8 +1589,7 @@ public void testSetWorkerHarnessContainerImageInPipelineProto() throws Exception Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values()); DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload()); - assertEquals( - DataflowRunner.getV2SdkHarnessContainerImageForJob(options), payload.getContainerImage()); + assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage()); } /** @@ -1624,8 +1621,7 @@ public void testSetSdkContainerImageInPipelineProto() throws Exception { Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values()); DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload()); - assertEquals( - DataflowRunner.getV2SdkHarnessContainerImageForJob(options), payload.getContainerImage()); + assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage()); } @Test diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index b33257ac3d79..c9bd50da0a56 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow; +import static org.apache.beam.runners.dataflow.DataflowRunner.getContainerImageForJob; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files.getFileExtension; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -643,6 +644,28 @@ public void testZoneAliasWorkerZone() { assertEquals("us-east1-b", options.getWorkerZone()); } + @Test + public void testAliasForLegacyWorkerHarnessContainerImage() { + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); + String testImage = "image.url:worker"; + options.setWorkerHarnessContainerImage(testImage); + DataflowRunner.validateWorkerSettings(options); + assertEquals(testImage, options.getWorkerHarnessContainerImage()); + assertEquals(testImage, options.getSdkContainerImage()); + } + + @Test + public void testAliasForSdkContainerImage() { + DataflowPipelineWorkerPoolOptions options = + PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); + String testImage = "image.url:sdk"; + options.setSdkContainerImage("image.url:sdk"); + DataflowRunner.validateWorkerSettings(options); + assertEquals(testImage, options.getWorkerHarnessContainerImage()); + assertEquals(testImage, options.getSdkContainerImage()); + } + @Test public void testRegionRequiredForServiceRunner() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); @@ -1713,7 +1736,7 @@ private void verifySdkHarnessConfiguration(DataflowPipelineOptions options) { p.apply(Create.of(Arrays.asList(1, 2, 3))); - String defaultSdkContainerImage = DataflowRunner.getV2SdkHarnessContainerImageForJob(options); + String defaultSdkContainerImage = DataflowRunner.getContainerImageForJob(options); SdkComponents sdkComponents = SdkComponents.create(); RunnerApi.Environment defaultEnvironmentForDataflow = Environments.createDockerEnvironment(defaultSdkContainerImage); @@ -2004,7 +2027,7 @@ public void close() {} } @Test - public void testGetV2SdkHarnessContainerImageForJobFromOption() { + public void testGetContainerImageForJobFromOption() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); String[] testCases = { @@ -2019,14 +2042,14 @@ public void testGetV2SdkHarnessContainerImageForJobFromOption() { for (String testCase : testCases) { // When image option is set, should use that exact image. options.setSdkContainerImage(testCase); - assertThat(DataflowRunner.getV2SdkHarnessContainerImageForJob(options), equalTo(testCase)); + assertThat(getContainerImageForJob(options), equalTo(testCase)); } } @Test - public void testGetV1WorkerContainerImageForJobFromOptionWithPlaceholder() { + public void testGetContainerImageForJobFromOptionWithPlaceholder() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setWorkerHarnessContainerImage("gcr.io/IMAGE/foo"); + options.setSdkContainerImage("gcr.io/IMAGE/foo"); for (Environments.JavaVersion javaVersion : Environments.JavaVersion.values()) { System.setProperty("java.specification.version", javaVersion.specification()); @@ -2034,37 +2057,28 @@ public void testGetV1WorkerContainerImageForJobFromOptionWithPlaceholder() { options.setExperiments(null); options.setStreaming(false); assertThat( - DataflowRunner.getV1WorkerContainerImageForJob(options), + getContainerImageForJob(options), equalTo(String.format("gcr.io/beam-%s-batch/foo", javaVersion.legacyName()))); // streaming, legacy options.setExperiments(null); options.setStreaming(true); assertThat( - DataflowRunner.getV1WorkerContainerImageForJob(options), + getContainerImageForJob(options), equalTo(String.format("gcr.io/beam-%s-streaming/foo", javaVersion.legacyName()))); - } - } - - @Test - public void testGetV2SdkHarnessContainerImageForJobFromOptionWithPlaceholder() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setSdkContainerImage("gcr.io/IMAGE/foo"); - for (Environments.JavaVersion javaVersion : Environments.JavaVersion.values()) { - System.setProperty("java.specification.version", javaVersion.specification()); // batch, FnAPI options.setExperiments(ImmutableList.of("beam_fn_api")); options.setStreaming(false); assertThat( - DataflowRunner.getV2SdkHarnessContainerImageForJob(options), + getContainerImageForJob(options), equalTo(String.format("gcr.io/beam_%s_sdk/foo", javaVersion.name()))); // streaming, FnAPI options.setExperiments(ImmutableList.of("beam_fn_api")); options.setStreaming(true); assertThat( - DataflowRunner.getV2SdkHarnessContainerImageForJob(options), + getContainerImageForJob(options), equalTo(String.format("gcr.io/beam_%s_sdk/foo", javaVersion.name()))); } }