Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions model/fn-execution/src/main/proto/beam_fn_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ message ProcessBundleRequest {
// (Optional) A list of cache tokens that can be used by an SDK to reuse
// cached data returned by the State API across multiple bundles.
//
// Note that SDKs that can efficiently consume this field should declare
// the beam:protocol:state_caching:v1 capability enabling runners to reduce
// the amount of memory used.
//
// See https://s.apache.org/beam-fn-state-api-and-bundle-processing#heading=h.7ghoih5aig5m
// for additional details on how to use the cache token with the State API
// to cache data across bundle boundaries.
Expand Down
6 changes: 6 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,12 @@ message StandardProtocols {
// https://s.apache.org/beam-fn-api-control-data-embedding
CONTROL_REQUEST_ELEMENTS_EMBEDDING = 6
[(beam_urn) = "beam:protocol:control_request_elements_embedding:v1"];

// Indicates that this SDK can cache user state and side inputs across
// bundle boundaries. This is a hint to runners that runners can rely on the
// SDKs ability to store the data in memory reducing the amount of memory
// used overall.
STATE_CACHING = 7 [(beam_urn) = "beam:protocol:state_caching:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -129,8 +130,9 @@ public static JavaVersion forSpecification(String specification) {
* container.
*/

private static final String JAVA_SDK_HARNESS_CONTAINER_URL =
getDefaultJavaSdkHarnessContainerUrl();
@VisibleForTesting
static final String JAVA_SDK_HARNESS_CONTAINER_URL = getDefaultJavaSdkHarnessContainerUrl();

public static final Environment JAVA_SDK_HARNESS_ENVIRONMENT =
createDockerEnvironment(JAVA_SDK_HARNESS_CONTAINER_URL);

Expand Down Expand Up @@ -385,6 +387,7 @@ public static Set<String> getJavaCapabilities() {
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.PROGRESS_REPORTING));
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.HARNESS_MONITORING_INFOS));
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_EMBEDDING));
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.STATE_CACHING));
capabilities.add("beam:version:sdk_base:" + JAVA_SDK_HARNESS_CONTAINER_URL);
capabilities.add(BeamUrns.getUrn(SplittableParDoComponents.TRUNCATE_SIZED_RESTRICTION));
capabilities.add(BeamUrns.getUrn(Primitives.TO_STRING));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.core.construction;

import static org.apache.beam.runners.core.construction.Environments.JAVA_SDK_HARNESS_CONTAINER_URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
Expand Down Expand Up @@ -183,17 +184,38 @@ public void environmentConfigAndEnvironmentOptionsAreMutuallyExclusive() {

@Test
public void testCapabilities() {
// Check a subset of coders
assertThat(Environments.getJavaCapabilities(), hasItem(ModelCoders.LENGTH_PREFIX_CODER_URN));
assertThat(Environments.getJavaCapabilities(), hasItem(ModelCoders.ROW_CODER_URN));
// Check all protocol based capabilities
assertThat(
Environments.getJavaCapabilities(),
hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.MULTI_CORE_BUNDLE_PROCESSING)));
assertThat(
Environments.getJavaCapabilities(),
hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.PROGRESS_REPORTING)));
assertThat(
Environments.getJavaCapabilities(),
hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.HARNESS_MONITORING_INFOS)));
assertThat(
Environments.getJavaCapabilities(),
hasItem(
BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_EMBEDDING)));
assertThat(
Environments.getJavaCapabilities(),
hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.STATE_CACHING)));
// Check that SDF truncation is supported
assertThat(
Environments.getJavaCapabilities(),
hasItem(
BeamUrns.getUrn(
RunnerApi.StandardPTransforms.SplittableParDoComponents
.TRUNCATE_SIZED_RESTRICTION)));
// Check that the sdk_base is inserted
assertThat(
Environments.getJavaCapabilities(),
hasItem("beam:version:sdk_base:" + JAVA_SDK_HARNESS_CONTAINER_URL));
// Check that ToString is supported for pretty printing user data.
assertThat(
Environments.getJavaCapabilities(),
hasItem(BeamUrns.getUrn(RunnerApi.StandardPTransforms.Primitives.TO_STRING)));
Expand Down