diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index ca1b701693f8..aaf5ab50160a 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 1, diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 3f4759213f78..d266aa094efa 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 3, diff --git a/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_Java.json b/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_Java.json index 77f68d215005..ac06b8aaf7ba 100644 --- a/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_Java.json @@ -1 +1,4 @@ -{"revision": 1} \ No newline at end of file +{ + "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", + "revision": 1 +} diff --git a/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_V2.json b/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_V2.json index b26833333238..e328a4f4bba1 100644 --- a/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_V2.json +++ b/.github/trigger_files/beam_PostCommit_Java_Examples_Dataflow_V2.json @@ -1,4 +1,5 @@ { + "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 2 } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 08d84705c5c7..50675a21eace 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -139,10 +139,11 @@ private static byte[] serializeWindowingStrategy( try { SdkComponents sdkComponents = SdkComponents.create(); - String workerHarnessContainerImageURL = - DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); + String v2SdkHarnessContainerImageURL = + DataflowRunner.getV2SdkHarnessContainerImageForJob( + options.as(DataflowPipelineOptions.class)); RunnerApi.Environment defaultEnvironmentForDataflow = - Environments.createDockerEnvironment(workerHarnessContainerImageURL); + Environments.createDockerEnvironment(v2SdkHarnessContainerImageURL); sdkComponents.registerEnvironment(defaultEnvironmentForDataflow); return WindowingStrategyTranslation.toMessageProto(windowingStrategy, sdkComponents) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d25a37e92dc3..82d00dd4f144 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -518,29 +518,16 @@ static boolean isServiceEndpoint(String endpoint) { } static void validateSdkContainerImageOptions(DataflowPipelineWorkerPoolOptions workerOptions) { - // Check against null - empty string value for workerHarnessContainerImage - // must be preserved for legacy dataflowWorkerJar to work. - String sdkContainerOption = workerOptions.getSdkContainerImage(); - String workerHarnessOption = workerOptions.getWorkerHarnessContainerImage(); - Preconditions.checkArgument( - sdkContainerOption == null - || workerHarnessOption == null - || sdkContainerOption.equals(workerHarnessOption), - "Cannot use legacy option workerHarnessContainerImage with sdkContainerImage. Prefer sdkContainerImage."); - - // Default to new option, which may be null. - String containerImage = workerOptions.getSdkContainerImage(); - if (workerOptions.getWorkerHarnessContainerImage() != null - && workerOptions.getSdkContainerImage() == null) { - // Set image to old option if old option was set but new option is not set. + if (workerOptions.getSdkContainerImage() != null + && workerOptions.getWorkerHarnessContainerImage() != null) { LOG.warn( - "Prefer --sdkContainerImage over deprecated legacy option --workerHarnessContainerImage."); - containerImage = workerOptions.getWorkerHarnessContainerImage(); + "Container specified for both --workerHarnessContainerImage and --sdkContainerImage. " + + "If you are a Beam of Dataflow developer, this could make sense, " + + "but otherwise may be a configuration error. " + + "The value of --workerHarnessContainerImage will be used only if the pipeline runs on Dataflow V1 " + + "and is *not* supported for end users. " + + "The value of --sdkContainerImage will be used only if the pipeline runs on Dataflow V2"); } - - // Make sure both options have same value. - workerOptions.setSdkContainerImage(containerImage); - workerOptions.setWorkerHarnessContainerImage(containerImage); } @VisibleForTesting @@ -1039,7 +1026,7 @@ protected RunnerApi.Pipeline applySdkEnvironmentOverrides( if (containerImage.startsWith("apache/beam") && !updated // don't update if the container image is already configured by DataflowRunner - && !containerImage.equals(getContainerImageForJob(options))) { + && !containerImage.equals(getV2SdkHarnessContainerImageForJob(options))) { containerImage = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerImageBaseRepository() + containerImage.substring(containerImage.lastIndexOf("/")); @@ -1290,21 +1277,19 @@ public DataflowPipelineJob run(Pipeline pipeline) { + "related to Google Compute Engine usage and other Google Cloud Services."); DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions); + String v1WorkerContainerImageURL = + DataflowRunner.getV1WorkerContainerImageForJob(dataflowOptions); + String v2SdkHarnessContainerImageURL = + DataflowRunner.getV2SdkHarnessContainerImageForJob(dataflowOptions); - // This incorrectly puns the worker harness container image (which implements v1beta3 API) - // with the SDK harness image (which implements Fn API). - // - // The same Environment is used in different and contradictory ways, depending on whether - // it is a v1 or v2 job submission. - RunnerApi.Environment defaultEnvironmentForDataflow = - Environments.createDockerEnvironment(workerHarnessContainerImageURL); + RunnerApi.Environment defaultEnvironmentForDataflowV2 = + Environments.createDockerEnvironment(v2SdkHarnessContainerImageURL); // The SdkComponents for portable an non-portable job submission must be kept distinct. Both // need the default environment. SdkComponents portableComponents = SdkComponents.create(); portableComponents.registerEnvironment( - defaultEnvironmentForDataflow + defaultEnvironmentForDataflowV2 .toBuilder() .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) @@ -1343,7 +1328,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Capture the SdkComponents for look up during step translations SdkComponents dataflowV1Components = SdkComponents.create(); dataflowV1Components.registerEnvironment( - defaultEnvironmentForDataflow + defaultEnvironmentForDataflowV2 .toBuilder() .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) @@ -1469,7 +1454,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { // For runner_v1, only worker_harness_container is set. // For runner_v2, both worker_harness_container and sdk_harness_container are set to the same // value. - String containerImage = getContainerImageForJob(options); + String containerImage = getV1WorkerContainerImageForJob(options); for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) { workerPool.setWorkerHarnessContainerImage(containerImage); } @@ -2634,38 +2619,61 @@ public Map, ReplacementOutput> mapOutputs( } @VisibleForTesting - static String getContainerImageForJob(DataflowPipelineOptions options) { + static String getV1WorkerContainerImageForJob(DataflowPipelineOptions options) { + String containerImage = options.getWorkerHarnessContainerImage(); + + if (containerImage == null) { + // If not set, construct and return default image URL. + return getDefaultV1WorkerContainerImageUrl(options); + } else if (containerImage.contains("IMAGE")) { + // Replace placeholder with default image name + return containerImage.replace("IMAGE", getDefaultV1WorkerContainerImageNameForJob(options)); + } else { + return containerImage; + } + } + + static String getV2SdkHarnessContainerImageForJob(DataflowPipelineOptions options) { String containerImage = options.getSdkContainerImage(); if (containerImage == null) { // If not set, construct and return default image URL. - return getDefaultContainerImageUrl(options); + return getDefaultV2SdkHarnessContainerImageUrl(options); } else if (containerImage.contains("IMAGE")) { // Replace placeholder with default image name - return containerImage.replace("IMAGE", getDefaultContainerImageNameForJob(options)); + return containerImage.replace("IMAGE", getDefaultV2SdkHarnessContainerImageNameForJob()); } else { return containerImage; } } - /** Construct the default Dataflow container full image URL. */ - static String getDefaultContainerImageUrl(DataflowPipelineOptions options) { + /** Construct the default Dataflow worker container full image URL. */ + static String getDefaultV1WorkerContainerImageUrl(DataflowPipelineOptions options) { DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); return String.format( "%s/%s:%s", dataflowRunnerInfo.getContainerImageBaseRepository(), - getDefaultContainerImageNameForJob(options), - getDefaultContainerVersion(options)); + getDefaultV1WorkerContainerImageNameForJob(options), + getDefaultV1WorkerContainerVersion(options)); + } + + /** Construct the default Java SDK container full image URL. */ + static String getDefaultV2SdkHarnessContainerImageUrl(DataflowPipelineOptions options) { + DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); + return String.format( + "%s/%s:%s", + dataflowRunnerInfo.getContainerImageBaseRepository(), + getDefaultV2SdkHarnessContainerImageNameForJob(), + getDefaultV2SdkHarnessContainerVersion(options)); } /** - * Construct the default Dataflow container image name based on pipeline type and Java version. + * Construct the default Dataflow V1 worker container image name based on pipeline type and Java + * version. */ - static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options) { + static String getDefaultV1WorkerContainerImageNameForJob(DataflowPipelineOptions options) { Environments.JavaVersion javaVersion = Environments.getJavaVersion(); - if (useUnifiedWorker(options)) { - return String.format("beam_%s_sdk", javaVersion.name()); - } else if (options.isStreaming()) { + if (options.isStreaming()) { return String.format("beam-%s-streaming", javaVersion.legacyName()); } else { return String.format("beam-%s-batch", javaVersion.legacyName()); @@ -2673,20 +2681,39 @@ static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options } /** - * Construct the default Dataflow container image name based on pipeline type and Java version. + * Construct the default Java SDK container image name based on pipeline type and Java version, + * for use by Dataflow V2. + */ + static String getDefaultV2SdkHarnessContainerImageNameForJob() { + Environments.JavaVersion javaVersion = Environments.getJavaVersion(); + return String.format("beam_%s_sdk", javaVersion.name()); + } + + /** + * Construct the default Dataflow V1 worker container image name based on pipeline type and Java + * version. */ - static String getDefaultContainerVersion(DataflowPipelineOptions options) { + static String getDefaultV1WorkerContainerVersion(DataflowPipelineOptions options) { DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo(); if (releaseInfo.isDevSdkVersion()) { - if (useUnifiedWorker(options)) { - return dataflowRunnerInfo.getFnApiDevContainerVersion(); - } return dataflowRunnerInfo.getLegacyDevContainerVersion(); } return releaseInfo.getSdkVersion(); } + /** + * Construct the default Dataflow container image name based on pipeline type and Java version. + */ + static String getDefaultV2SdkHarnessContainerVersion(DataflowPipelineOptions options) { + DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); + ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo(); + if (releaseInfo.isDevSdkVersion()) { + return dataflowRunnerInfo.getFnApiDevContainerVersion(); + } + return releaseInfo.getSdkVersion(); + } + static boolean useUnifiedWorker(DataflowPipelineOptions options) { return hasExperiment(options, "beam_fn_api") || hasExperiment(options, "use_runner_v2") diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index fd4af6d5e043..0d63b5ef245b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -104,17 +104,11 @@ public String getAlgorithm() { void setDiskSizeGb(int value); /** Container image used as Dataflow worker harness image. */ - /** @deprecated Use {@link #getSdkContainerImage} instead. */ @Description( - "Container image used to configure a Dataflow worker. " - + "Can only be used for official Dataflow container images. " - + "Prefer using sdkContainerImage instead.") - @Deprecated + "Container image to use for Dataflow V1 worker. Can only be used for official Dataflow container images. ") @Hidden String getWorkerHarnessContainerImage(); - /** @deprecated Use {@link #setSdkContainerImage} instead. */ - @Deprecated @Hidden void setWorkerHarnessContainerImage(String value); @@ -122,10 +116,7 @@ public String getAlgorithm() { * Container image used to configure SDK execution environment on worker. Used for custom * containers on portable pipelines only. */ - @Description( - "Container image used to configure the SDK execution environment of " - + "pipeline code on a worker. For non-portable pipelines, can only be " - + "used for official Dataflow container images.") + @Description("Container image to use for Beam Java SDK execution environment on Dataflow V2.") String getSdkContainerImage(); void setSdkContainerImage(String value); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8226dc2c7274..208cdaf1140d 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -156,7 +156,8 @@ private SdkComponents createSdkComponents(PipelineOptions options) { SdkComponents sdkComponents = SdkComponents.create(); String containerImageURL = - DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); + DataflowRunner.getV2SdkHarnessContainerImageForJob( + options.as(DataflowPipelineOptions.class)); RunnerApi.Environment defaultEnvironmentForDataflow = Environments.createDockerEnvironment(containerImageURL); @@ -1127,7 +1128,8 @@ public String apply(byte[] input) { file2.deleteOnExit(); SdkComponents sdkComponents = SdkComponents.create(); sdkComponents.registerEnvironment( - Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options)) + Environments.createDockerEnvironment( + DataflowRunner.getV2SdkHarnessContainerImageForJob(options)) .toBuilder() .addAllDependencies( Environments.getArtifacts( @@ -1589,7 +1591,8 @@ public void testSetWorkerHarnessContainerImageInPipelineProto() throws Exception Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values()); DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload()); - assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage()); + assertEquals( + DataflowRunner.getV2SdkHarnessContainerImageForJob(options), payload.getContainerImage()); } /** @@ -1621,7 +1624,8 @@ public void testSetSdkContainerImageInPipelineProto() throws Exception { Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values()); DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload()); - assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage()); + assertEquals( + DataflowRunner.getV2SdkHarnessContainerImageForJob(options), payload.getContainerImage()); } @Test diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index c9bd50da0a56..b33257ac3d79 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.dataflow; -import static org.apache.beam.runners.dataflow.DataflowRunner.getContainerImageForJob; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files.getFileExtension; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -644,28 +643,6 @@ public void testZoneAliasWorkerZone() { assertEquals("us-east1-b", options.getWorkerZone()); } - @Test - public void testAliasForLegacyWorkerHarnessContainerImage() { - DataflowPipelineWorkerPoolOptions options = - PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); - String testImage = "image.url:worker"; - options.setWorkerHarnessContainerImage(testImage); - DataflowRunner.validateWorkerSettings(options); - assertEquals(testImage, options.getWorkerHarnessContainerImage()); - assertEquals(testImage, options.getSdkContainerImage()); - } - - @Test - public void testAliasForSdkContainerImage() { - DataflowPipelineWorkerPoolOptions options = - PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class); - String testImage = "image.url:sdk"; - options.setSdkContainerImage("image.url:sdk"); - DataflowRunner.validateWorkerSettings(options); - assertEquals(testImage, options.getWorkerHarnessContainerImage()); - assertEquals(testImage, options.getSdkContainerImage()); - } - @Test public void testRegionRequiredForServiceRunner() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); @@ -1736,7 +1713,7 @@ private void verifySdkHarnessConfiguration(DataflowPipelineOptions options) { p.apply(Create.of(Arrays.asList(1, 2, 3))); - String defaultSdkContainerImage = DataflowRunner.getContainerImageForJob(options); + String defaultSdkContainerImage = DataflowRunner.getV2SdkHarnessContainerImageForJob(options); SdkComponents sdkComponents = SdkComponents.create(); RunnerApi.Environment defaultEnvironmentForDataflow = Environments.createDockerEnvironment(defaultSdkContainerImage); @@ -2027,7 +2004,7 @@ public void close() {} } @Test - public void testGetContainerImageForJobFromOption() { + public void testGetV2SdkHarnessContainerImageForJobFromOption() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); String[] testCases = { @@ -2042,14 +2019,14 @@ public void testGetContainerImageForJobFromOption() { for (String testCase : testCases) { // When image option is set, should use that exact image. options.setSdkContainerImage(testCase); - assertThat(getContainerImageForJob(options), equalTo(testCase)); + assertThat(DataflowRunner.getV2SdkHarnessContainerImageForJob(options), equalTo(testCase)); } } @Test - public void testGetContainerImageForJobFromOptionWithPlaceholder() { + public void testGetV1WorkerContainerImageForJobFromOptionWithPlaceholder() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setSdkContainerImage("gcr.io/IMAGE/foo"); + options.setWorkerHarnessContainerImage("gcr.io/IMAGE/foo"); for (Environments.JavaVersion javaVersion : Environments.JavaVersion.values()) { System.setProperty("java.specification.version", javaVersion.specification()); @@ -2057,28 +2034,37 @@ public void testGetContainerImageForJobFromOptionWithPlaceholder() { options.setExperiments(null); options.setStreaming(false); assertThat( - getContainerImageForJob(options), + DataflowRunner.getV1WorkerContainerImageForJob(options), equalTo(String.format("gcr.io/beam-%s-batch/foo", javaVersion.legacyName()))); // streaming, legacy options.setExperiments(null); options.setStreaming(true); assertThat( - getContainerImageForJob(options), + DataflowRunner.getV1WorkerContainerImageForJob(options), equalTo(String.format("gcr.io/beam-%s-streaming/foo", javaVersion.legacyName()))); + } + } + + @Test + public void testGetV2SdkHarnessContainerImageForJobFromOptionWithPlaceholder() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setSdkContainerImage("gcr.io/IMAGE/foo"); + for (Environments.JavaVersion javaVersion : Environments.JavaVersion.values()) { + System.setProperty("java.specification.version", javaVersion.specification()); // batch, FnAPI options.setExperiments(ImmutableList.of("beam_fn_api")); options.setStreaming(false); assertThat( - getContainerImageForJob(options), + DataflowRunner.getV2SdkHarnessContainerImageForJob(options), equalTo(String.format("gcr.io/beam_%s_sdk/foo", javaVersion.name()))); // streaming, FnAPI options.setExperiments(ImmutableList.of("beam_fn_api")); options.setStreaming(true); assertThat( - getContainerImageForJob(options), + DataflowRunner.getV2SdkHarnessContainerImageForJob(options), equalTo(String.format("gcr.io/beam_%s_sdk/foo", javaVersion.name()))); } }