Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/trigger_files/beam_PostCommit_Java_DataflowV1.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1,
Expand Down
1 change: 1 addition & 0 deletions .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
{"revision": 1}
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"revision": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ private static byte[] serializeWindowingStrategy(
try {
SdkComponents sdkComponents = SdkComponents.create();

String workerHarnessContainerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
String v2SdkHarnessContainerImageURL =
DataflowRunner.getV2SdkHarnessContainerImageForJob(
options.as(DataflowPipelineOptions.class));
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(workerHarnessContainerImageURL);
Environments.createDockerEnvironment(v2SdkHarnessContainerImageURL);
sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);

return WindowingStrategyTranslation.toMessageProto(windowingStrategy, sdkComponents)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,29 +518,16 @@ static boolean isServiceEndpoint(String endpoint) {
}

static void validateSdkContainerImageOptions(DataflowPipelineWorkerPoolOptions workerOptions) {
// Check against null - empty string value for workerHarnessContainerImage
// must be preserved for legacy dataflowWorkerJar to work.
String sdkContainerOption = workerOptions.getSdkContainerImage();
String workerHarnessOption = workerOptions.getWorkerHarnessContainerImage();
Preconditions.checkArgument(
sdkContainerOption == null
|| workerHarnessOption == null
|| sdkContainerOption.equals(workerHarnessOption),
"Cannot use legacy option workerHarnessContainerImage with sdkContainerImage. Prefer sdkContainerImage.");

// Default to new option, which may be null.
String containerImage = workerOptions.getSdkContainerImage();
if (workerOptions.getWorkerHarnessContainerImage() != null
&& workerOptions.getSdkContainerImage() == null) {
// Set image to old option if old option was set but new option is not set.
if (workerOptions.getSdkContainerImage() != null
&& workerOptions.getWorkerHarnessContainerImage() != null) {
LOG.warn(
"Prefer --sdkContainerImage over deprecated legacy option --workerHarnessContainerImage.");
containerImage = workerOptions.getWorkerHarnessContainerImage();
"Container specified for both --workerHarnessContainerImage and --sdkContainerImage. "
+ "If you are a Beam of Dataflow developer, this could make sense, "
+ "but otherwise may be a configuration error. "
+ "The value of --workerHarnessContainerImage will be used only if the pipeline runs on Dataflow V1 "
+ "and is *not* supported for end users. "
+ "The value of --sdkContainerImage will be used only if the pipeline runs on Dataflow V2");
}

// Make sure both options have same value.
workerOptions.setSdkContainerImage(containerImage);
workerOptions.setWorkerHarnessContainerImage(containerImage);
}

@VisibleForTesting
Expand Down Expand Up @@ -1039,7 +1026,7 @@ protected RunnerApi.Pipeline applySdkEnvironmentOverrides(
if (containerImage.startsWith("apache/beam")
&& !updated
// don't update if the container image is already configured by DataflowRunner
&& !containerImage.equals(getContainerImageForJob(options))) {
&& !containerImage.equals(getV2SdkHarnessContainerImageForJob(options))) {
containerImage =
DataflowRunnerInfo.getDataflowRunnerInfo().getContainerImageBaseRepository()
+ containerImage.substring(containerImage.lastIndexOf("/"));
Expand Down Expand Up @@ -1290,21 +1277,19 @@ public DataflowPipelineJob run(Pipeline pipeline) {
+ "related to Google Compute Engine usage and other Google Cloud Services.");

DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);
String v1WorkerContainerImageURL =
DataflowRunner.getV1WorkerContainerImageForJob(dataflowOptions);
String v2SdkHarnessContainerImageURL =
DataflowRunner.getV2SdkHarnessContainerImageForJob(dataflowOptions);

// This incorrectly puns the worker harness container image (which implements v1beta3 API)
// with the SDK harness image (which implements Fn API).
//
// The same Environment is used in different and contradictory ways, depending on whether
// it is a v1 or v2 job submission.
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(workerHarnessContainerImageURL);
RunnerApi.Environment defaultEnvironmentForDataflowV2 =
Environments.createDockerEnvironment(v2SdkHarnessContainerImageURL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the comments of this line (L1284-1290) still relevant (or need update)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Removed the comment and named variable to make it obvious.


// The SdkComponents for portable an non-portable job submission must be kept distinct. Both
// need the default environment.
SdkComponents portableComponents = SdkComponents.create();
portableComponents.registerEnvironment(
defaultEnvironmentForDataflow
defaultEnvironmentForDataflowV2
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
Expand Down Expand Up @@ -1343,7 +1328,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// Capture the SdkComponents for look up during step translations
SdkComponents dataflowV1Components = SdkComponents.create();
dataflowV1Components.registerEnvironment(
defaultEnvironmentForDataflow
defaultEnvironmentForDataflowV2
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
Expand Down Expand Up @@ -1469,7 +1454,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// For runner_v1, only worker_harness_container is set.
// For runner_v2, both worker_harness_container and sdk_harness_container are set to the same
// value.
String containerImage = getContainerImageForJob(options);
String containerImage = getV1WorkerContainerImageForJob(options);
for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
workerPool.setWorkerHarnessContainerImage(containerImage);
}
Expand Down Expand Up @@ -2634,59 +2619,101 @@ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
}

@VisibleForTesting
static String getContainerImageForJob(DataflowPipelineOptions options) {
static String getV1WorkerContainerImageForJob(DataflowPipelineOptions options) {
String containerImage = options.getWorkerHarnessContainerImage();

if (containerImage == null) {
// If not set, construct and return default image URL.
return getDefaultV1WorkerContainerImageUrl(options);
} else if (containerImage.contains("IMAGE")) {
// Replace placeholder with default image name
return containerImage.replace("IMAGE", getDefaultV1WorkerContainerImageNameForJob(options));
} else {
return containerImage;
}
}

static String getV2SdkHarnessContainerImageForJob(DataflowPipelineOptions options) {
String containerImage = options.getSdkContainerImage();

if (containerImage == null) {
// If not set, construct and return default image URL.
return getDefaultContainerImageUrl(options);
return getDefaultV2SdkHarnessContainerImageUrl(options);
} else if (containerImage.contains("IMAGE")) {
// Replace placeholder with default image name
return containerImage.replace("IMAGE", getDefaultContainerImageNameForJob(options));
return containerImage.replace("IMAGE", getDefaultV2SdkHarnessContainerImageNameForJob());
} else {
return containerImage;
}
}

/** Construct the default Dataflow container full image URL. */
static String getDefaultContainerImageUrl(DataflowPipelineOptions options) {
/** Construct the default Dataflow worker container full image URL. */
static String getDefaultV1WorkerContainerImageUrl(DataflowPipelineOptions options) {
DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
return String.format(
"%s/%s:%s",
dataflowRunnerInfo.getContainerImageBaseRepository(),
getDefaultContainerImageNameForJob(options),
getDefaultContainerVersion(options));
getDefaultV1WorkerContainerImageNameForJob(options),
getDefaultV1WorkerContainerVersion(options));
}

/** Construct the default Java SDK container full image URL. */
static String getDefaultV2SdkHarnessContainerImageUrl(DataflowPipelineOptions options) {
DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
return String.format(
"%s/%s:%s",
dataflowRunnerInfo.getContainerImageBaseRepository(),
getDefaultV2SdkHarnessContainerImageNameForJob(),
getDefaultV2SdkHarnessContainerVersion(options));
}

/**
* Construct the default Dataflow container image name based on pipeline type and Java version.
* Construct the default Dataflow V1 worker container image name based on pipeline type and Java
* version.
*/
static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options) {
static String getDefaultV1WorkerContainerImageNameForJob(DataflowPipelineOptions options) {
Environments.JavaVersion javaVersion = Environments.getJavaVersion();
if (useUnifiedWorker(options)) {
return String.format("beam_%s_sdk", javaVersion.name());
} else if (options.isStreaming()) {
if (options.isStreaming()) {
return String.format("beam-%s-streaming", javaVersion.legacyName());
} else {
return String.format("beam-%s-batch", javaVersion.legacyName());
}
}

/**
* Construct the default Dataflow container image name based on pipeline type and Java version.
* Construct the default Java SDK container image name based on pipeline type and Java version,
* for use by Dataflow V2.
*/
static String getDefaultV2SdkHarnessContainerImageNameForJob() {
Environments.JavaVersion javaVersion = Environments.getJavaVersion();
return String.format("beam_%s_sdk", javaVersion.name());
}

/**
* Construct the default Dataflow V1 worker container image name based on pipeline type and Java
* version.
*/
static String getDefaultContainerVersion(DataflowPipelineOptions options) {
static String getDefaultV1WorkerContainerVersion(DataflowPipelineOptions options) {
DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
if (releaseInfo.isDevSdkVersion()) {
if (useUnifiedWorker(options)) {
return dataflowRunnerInfo.getFnApiDevContainerVersion();
}
return dataflowRunnerInfo.getLegacyDevContainerVersion();
}
return releaseInfo.getSdkVersion();
}

/**
* Construct the default Dataflow container image name based on pipeline type and Java version.
*/
static String getDefaultV2SdkHarnessContainerVersion(DataflowPipelineOptions options) {
DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
if (releaseInfo.isDevSdkVersion()) {
return dataflowRunnerInfo.getFnApiDevContainerVersion();
}
return releaseInfo.getSdkVersion();
}

static boolean useUnifiedWorker(DataflowPipelineOptions options) {
return hasExperiment(options, "beam_fn_api")
|| hasExperiment(options, "use_runner_v2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,28 +104,19 @@ public String getAlgorithm() {
void setDiskSizeGb(int value);

/** Container image used as Dataflow worker harness image. */
/** @deprecated Use {@link #getSdkContainerImage} instead. */
@Description(
"Container image used to configure a Dataflow worker. "
+ "Can only be used for official Dataflow container images. "
+ "Prefer using sdkContainerImage instead.")
@Deprecated
"Container image to use for Dataflow V1 worker. Can only be used for official Dataflow container images. ")
@Hidden
String getWorkerHarnessContainerImage();

/** @deprecated Use {@link #setSdkContainerImage} instead. */
@Deprecated
@Hidden
void setWorkerHarnessContainerImage(String value);

/**
* Container image used to configure SDK execution environment on worker. Used for custom
* containers on portable pipelines only.
*/
@Description(
"Container image used to configure the SDK execution environment of "
+ "pipeline code on a worker. For non-portable pipelines, can only be "
+ "used for official Dataflow container images.")
@Description("Container image to use for Beam Java SDK execution environment on Dataflow V2.")
String getSdkContainerImage();

void setSdkContainerImage(String value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ private SdkComponents createSdkComponents(PipelineOptions options) {
SdkComponents sdkComponents = SdkComponents.create();

String containerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
DataflowRunner.getV2SdkHarnessContainerImageForJob(
options.as(DataflowPipelineOptions.class));
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(containerImageURL);

Expand Down Expand Up @@ -1127,7 +1128,8 @@ public String apply(byte[] input) {
file2.deleteOnExit();
SdkComponents sdkComponents = SdkComponents.create();
sdkComponents.registerEnvironment(
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
Environments.createDockerEnvironment(
DataflowRunner.getV2SdkHarnessContainerImageForJob(options))
.toBuilder()
.addAllDependencies(
Environments.getArtifacts(
Expand Down Expand Up @@ -1589,7 +1591,8 @@ public void testSetWorkerHarnessContainerImageInPipelineProto() throws Exception
Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values());

DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload());
assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage());
assertEquals(
DataflowRunner.getV2SdkHarnessContainerImageForJob(options), payload.getContainerImage());
}

/**
Expand Down Expand Up @@ -1621,7 +1624,8 @@ public void testSetSdkContainerImageInPipelineProto() throws Exception {
Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values());

DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload());
assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage());
assertEquals(
DataflowRunner.getV2SdkHarnessContainerImageForJob(options), payload.getContainerImage());
}

@Test
Expand Down
Loading
Loading