Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,10 @@ private static byte[] serializeWindowingStrategy(
try {
SdkComponents sdkComponents = SdkComponents.create();

String v2SdkHarnessContainerImageURL =
DataflowRunner.getV2SdkHarnessContainerImageForJob(
options.as(DataflowPipelineOptions.class));
String workerHarnessContainerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(v2SdkHarnessContainerImageURL);
Environments.createDockerEnvironment(workerHarnessContainerImageURL);
sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);

return WindowingStrategyTranslation.toMessageProto(windowingStrategy, sdkComponents)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,29 @@ static boolean isServiceEndpoint(String endpoint) {
}

static void validateSdkContainerImageOptions(DataflowPipelineWorkerPoolOptions workerOptions) {
if (workerOptions.getSdkContainerImage() != null
&& workerOptions.getWorkerHarnessContainerImage() != null) {
// Check against null - empty string value for workerHarnessContainerImage
// must be preserved for legacy dataflowWorkerJar to work.
String sdkContainerOption = workerOptions.getSdkContainerImage();
String workerHarnessOption = workerOptions.getWorkerHarnessContainerImage();
Preconditions.checkArgument(
sdkContainerOption == null
|| workerHarnessOption == null
|| sdkContainerOption.equals(workerHarnessOption),
"Cannot use legacy option workerHarnessContainerImage with sdkContainerImage. Prefer sdkContainerImage.");

// Default to new option, which may be null.
String containerImage = workerOptions.getSdkContainerImage();
if (workerOptions.getWorkerHarnessContainerImage() != null
&& workerOptions.getSdkContainerImage() == null) {
// Set image to old option if old option was set but new option is not set.
LOG.warn(
"Container specified for both --workerHarnessContainerImage and --sdkContainerImage. "
+ "If you are a Beam of Dataflow developer, this could make sense, "
+ "but otherwise may be a configuration error. "
+ "The value of --workerHarnessContainerImage will be used only if the pipeline runs on Dataflow V1 "
+ "and is *not* supported for end users. "
+ "The value of --sdkContainerImage will be used only if the pipeline runs on Dataflow V2");
"Prefer --sdkContainerImage over deprecated legacy option --workerHarnessContainerImage.");
containerImage = workerOptions.getWorkerHarnessContainerImage();
}

// Make sure both options have same value.
workerOptions.setSdkContainerImage(containerImage);
workerOptions.setWorkerHarnessContainerImage(containerImage);
}

@VisibleForTesting
Expand Down Expand Up @@ -1026,7 +1039,7 @@ protected RunnerApi.Pipeline applySdkEnvironmentOverrides(
if (containerImage.startsWith("apache/beam")
&& !updated
// don't update if the container image is already configured by DataflowRunner
&& !containerImage.equals(getV2SdkHarnessContainerImageForJob(options))) {
&& !containerImage.equals(getContainerImageForJob(options))) {
containerImage =
DataflowRunnerInfo.getDataflowRunnerInfo().getContainerImageBaseRepository()
+ containerImage.substring(containerImage.lastIndexOf("/"));
Expand Down Expand Up @@ -1277,19 +1290,21 @@ public DataflowPipelineJob run(Pipeline pipeline) {
+ "related to Google Compute Engine usage and other Google Cloud Services.");

DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
String v1WorkerContainerImageURL =
DataflowRunner.getV1WorkerContainerImageForJob(dataflowOptions);
String v2SdkHarnessContainerImageURL =
DataflowRunner.getV2SdkHarnessContainerImageForJob(dataflowOptions);
String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);

RunnerApi.Environment defaultEnvironmentForDataflowV2 =
Environments.createDockerEnvironment(v2SdkHarnessContainerImageURL);
// This incorrectly puns the worker harness container image (which implements v1beta3 API)
// with the SDK harness image (which implements Fn API).
//
// The same Environment is used in different and contradictory ways, depending on whether
// it is a v1 or v2 job submission.
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(workerHarnessContainerImageURL);

// The SdkComponents for portable an non-portable job submission must be kept distinct. Both
// need the default environment.
SdkComponents portableComponents = SdkComponents.create();
portableComponents.registerEnvironment(
defaultEnvironmentForDataflowV2
defaultEnvironmentForDataflow
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
Expand Down Expand Up @@ -1328,7 +1343,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// Capture the SdkComponents for look up during step translations
SdkComponents dataflowV1Components = SdkComponents.create();
dataflowV1Components.registerEnvironment(
defaultEnvironmentForDataflowV2
defaultEnvironmentForDataflow
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
Expand Down Expand Up @@ -1454,7 +1469,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// For runner_v1, only worker_harness_container is set.
// For runner_v2, both worker_harness_container and sdk_harness_container are set to the same
// value.
String containerImage = getV1WorkerContainerImageForJob(options);
String containerImage = getContainerImageForJob(options);
for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
workerPool.setWorkerHarnessContainerImage(containerImage);
}
Expand Down Expand Up @@ -2619,97 +2634,55 @@ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
}

@VisibleForTesting
static String getV1WorkerContainerImageForJob(DataflowPipelineOptions options) {
String containerImage = options.getWorkerHarnessContainerImage();

if (containerImage == null) {
// If not set, construct and return default image URL.
return getDefaultV1WorkerContainerImageUrl(options);
} else if (containerImage.contains("IMAGE")) {
// Replace placeholder with default image name
return containerImage.replace("IMAGE", getDefaultV1WorkerContainerImageNameForJob(options));
} else {
return containerImage;
}
}

static String getV2SdkHarnessContainerImageForJob(DataflowPipelineOptions options) {
static String getContainerImageForJob(DataflowPipelineOptions options) {
String containerImage = options.getSdkContainerImage();

if (containerImage == null) {
// If not set, construct and return default image URL.
return getDefaultV2SdkHarnessContainerImageUrl(options);
return getDefaultContainerImageUrl(options);
} else if (containerImage.contains("IMAGE")) {
// Replace placeholder with default image name
return containerImage.replace("IMAGE", getDefaultV2SdkHarnessContainerImageNameForJob());
return containerImage.replace("IMAGE", getDefaultContainerImageNameForJob(options));
} else {
return containerImage;
}
}

/** Construct the default Dataflow worker container full image URL. */
static String getDefaultV1WorkerContainerImageUrl(DataflowPipelineOptions options) {
/** Construct the default Dataflow container full image URL. */
static String getDefaultContainerImageUrl(DataflowPipelineOptions options) {
DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
return String.format(
"%s/%s:%s",
dataflowRunnerInfo.getContainerImageBaseRepository(),
getDefaultV1WorkerContainerImageNameForJob(options),
getDefaultV1WorkerContainerVersion(options));
}

/** Construct the default Java SDK container full image URL. */
static String getDefaultV2SdkHarnessContainerImageUrl(DataflowPipelineOptions options) {
DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
return String.format(
"%s/%s:%s",
dataflowRunnerInfo.getContainerImageBaseRepository(),
getDefaultV2SdkHarnessContainerImageNameForJob(),
getDefaultV2SdkHarnessContainerVersion(options));
getDefaultContainerImageNameForJob(options),
getDefaultContainerVersion(options));
}

/**
* Construct the default Dataflow V1 worker container image name based on pipeline type and Java
* version.
* Construct the default Dataflow container image name based on pipeline type and Java version.
*/
static String getDefaultV1WorkerContainerImageNameForJob(DataflowPipelineOptions options) {
static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options) {
Environments.JavaVersion javaVersion = Environments.getJavaVersion();
if (options.isStreaming()) {
if (useUnifiedWorker(options)) {
return String.format("beam_%s_sdk", javaVersion.name());
} else if (options.isStreaming()) {
return String.format("beam-%s-streaming", javaVersion.legacyName());
} else {
return String.format("beam-%s-batch", javaVersion.legacyName());
}
}

/**
* Construct the default Java SDK container image name based on pipeline type and Java version,
* for use by Dataflow V2.
*/
static String getDefaultV2SdkHarnessContainerImageNameForJob() {
Environments.JavaVersion javaVersion = Environments.getJavaVersion();
return String.format("beam_%s_sdk", javaVersion.name());
}

/**
* Construct the default Dataflow V1 worker container image name based on pipeline type and Java
* version.
*/
static String getDefaultV1WorkerContainerVersion(DataflowPipelineOptions options) {
DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
if (releaseInfo.isDevSdkVersion()) {
return dataflowRunnerInfo.getLegacyDevContainerVersion();
}
return releaseInfo.getSdkVersion();
}

/**
* Construct the default Dataflow container image name based on pipeline type and Java version.
*/
static String getDefaultV2SdkHarnessContainerVersion(DataflowPipelineOptions options) {
static String getDefaultContainerVersion(DataflowPipelineOptions options) {
DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
if (releaseInfo.isDevSdkVersion()) {
return dataflowRunnerInfo.getFnApiDevContainerVersion();
if (useUnifiedWorker(options)) {
return dataflowRunnerInfo.getFnApiDevContainerVersion();
}
return dataflowRunnerInfo.getLegacyDevContainerVersion();
}
return releaseInfo.getSdkVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,28 @@ public String getAlgorithm() {
void setDiskSizeGb(int value);

/** Container image used as Dataflow worker harness image. */
/** @deprecated Use {@link #getSdkContainerImage} instead. */
@Description(
"Container image to use for Dataflow V1 worker. Can only be used for official Dataflow container images. ")
"Container image used to configure a Dataflow worker. "
+ "Can only be used for official Dataflow container images. "
+ "Prefer using sdkContainerImage instead.")
@Deprecated
@Hidden
String getWorkerHarnessContainerImage();

/** @deprecated Use {@link #setSdkContainerImage} instead. */
@Deprecated
@Hidden
void setWorkerHarnessContainerImage(String value);

/**
* Container image used to configure SDK execution environment on worker. Used for custom
* containers on portable pipelines only.
*/
@Description("Container image to use for Beam Java SDK execution environment on Dataflow V2.")
@Description(
"Container image used to configure the SDK execution environment of "
+ "pipeline code on a worker. For non-portable pipelines, can only be "
+ "used for official Dataflow container images.")
String getSdkContainerImage();

void setSdkContainerImage(String value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ private SdkComponents createSdkComponents(PipelineOptions options) {
SdkComponents sdkComponents = SdkComponents.create();

String containerImageURL =
DataflowRunner.getV2SdkHarnessContainerImageForJob(
options.as(DataflowPipelineOptions.class));
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(containerImageURL);

Expand Down Expand Up @@ -1128,8 +1127,7 @@ public String apply(byte[] input) {
file2.deleteOnExit();
SdkComponents sdkComponents = SdkComponents.create();
sdkComponents.registerEnvironment(
Environments.createDockerEnvironment(
DataflowRunner.getV2SdkHarnessContainerImageForJob(options))
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
.toBuilder()
.addAllDependencies(
Environments.getArtifacts(
Expand Down Expand Up @@ -1591,8 +1589,7 @@ public void testSetWorkerHarnessContainerImageInPipelineProto() throws Exception
Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values());

DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload());
assertEquals(
DataflowRunner.getV2SdkHarnessContainerImageForJob(options), payload.getContainerImage());
assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage());
}

/**
Expand Down Expand Up @@ -1624,8 +1621,7 @@ public void testSetSdkContainerImageInPipelineProto() throws Exception {
Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values());

DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload());
assertEquals(
DataflowRunner.getV2SdkHarnessContainerImageForJob(options), payload.getContainerImage());
assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage());
}

@Test
Expand Down
Loading
Loading