From 4dfbd66a9840497d9a0fd1507bc14dda718e8e39 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Tue, 18 Jan 2022 10:11:46 -0800 Subject: [PATCH] [BEAM-13015] Add state caching capability to be used as hint for runners to not duplicate cached data if the SDK can do it for user state and side inputs. --- .../src/main/proto/beam_fn_api.proto | 4 ++++ .../src/main/proto/beam_runner_api.proto | 6 +++++ .../core/construction/Environments.java | 7 ++++-- .../core/construction/EnvironmentsTest.java | 22 +++++++++++++++++++ 4 files changed, 37 insertions(+), 2 deletions(-) diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto index 82f9c23a12f5..f203baac66a9 100644 --- a/model/fn-execution/src/main/proto/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/beam_fn_api.proto @@ -300,6 +300,10 @@ message ProcessBundleRequest { // (Optional) A list of cache tokens that can be used by an SDK to reuse // cached data returned by the State API across multiple bundles. // + // Note that SDKs that can efficiently consume this field should declare + // the beam:protocol:state_caching:v1 capability enabling runners to reduce + // the amount of memory used. + // // See https://s.apache.org/beam-fn-state-api-and-bundle-processing#heading=h.7ghoih5aig5m // for additional details on how to use the cache token with the State API // to cache data across bundle boundaries. diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 699da10d04c9..539c9b83c350 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -1581,6 +1581,12 @@ message StandardProtocols { // https://s.apache.org/beam-fn-api-control-data-embedding CONTROL_REQUEST_ELEMENTS_EMBEDDING = 6 [(beam_urn) = "beam:protocol:control_request_elements_embedding:v1"]; + + // Indicates that this SDK can cache user state and side inputs across + // bundle boundaries. This is a hint to runners that runners can rely on the + // SDKs ability to store the data in memory reducing the amount of memory + // used overall. + STATE_CACHING = 7 [(beam_urn) = "beam:protocol:state_caching:v1"]; } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java index e75d9886ba06..73acbec451af 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -129,8 +130,9 @@ public static JavaVersion forSpecification(String specification) { * container. */ - private static final String JAVA_SDK_HARNESS_CONTAINER_URL = - getDefaultJavaSdkHarnessContainerUrl(); + @VisibleForTesting + static final String JAVA_SDK_HARNESS_CONTAINER_URL = getDefaultJavaSdkHarnessContainerUrl(); + public static final Environment JAVA_SDK_HARNESS_ENVIRONMENT = createDockerEnvironment(JAVA_SDK_HARNESS_CONTAINER_URL); @@ -385,6 +387,7 @@ public static Set getJavaCapabilities() { capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.PROGRESS_REPORTING)); capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.HARNESS_MONITORING_INFOS)); capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_EMBEDDING)); + capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.STATE_CACHING)); capabilities.add("beam:version:sdk_base:" + JAVA_SDK_HARNESS_CONTAINER_URL); capabilities.add(BeamUrns.getUrn(SplittableParDoComponents.TRUNCATE_SIZED_RESTRICTION)); capabilities.add(BeamUrns.getUrn(Primitives.TO_STRING)); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java index d980d4a01639..3239bbb6fecd 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core.construction; +import static org.apache.beam.runners.core.construction.Environments.JAVA_SDK_HARNESS_CONTAINER_URL; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -183,17 +184,38 @@ public void environmentConfigAndEnvironmentOptionsAreMutuallyExclusive() { @Test public void testCapabilities() { + // Check a subset of coders assertThat(Environments.getJavaCapabilities(), hasItem(ModelCoders.LENGTH_PREFIX_CODER_URN)); assertThat(Environments.getJavaCapabilities(), hasItem(ModelCoders.ROW_CODER_URN)); + // Check all protocol based capabilities assertThat( Environments.getJavaCapabilities(), hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.MULTI_CORE_BUNDLE_PROCESSING))); + assertThat( + Environments.getJavaCapabilities(), + hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.PROGRESS_REPORTING))); + assertThat( + Environments.getJavaCapabilities(), + hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.HARNESS_MONITORING_INFOS))); + assertThat( + Environments.getJavaCapabilities(), + hasItem( + BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.CONTROL_REQUEST_ELEMENTS_EMBEDDING))); + assertThat( + Environments.getJavaCapabilities(), + hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.STATE_CACHING))); + // Check that SDF truncation is supported assertThat( Environments.getJavaCapabilities(), hasItem( BeamUrns.getUrn( RunnerApi.StandardPTransforms.SplittableParDoComponents .TRUNCATE_SIZED_RESTRICTION))); + // Check that the sdk_base is inserted + assertThat( + Environments.getJavaCapabilities(), + hasItem("beam:version:sdk_base:" + JAVA_SDK_HARNESS_CONTAINER_URL)); + // Check that ToString is supported for pretty printing user data. assertThat( Environments.getJavaCapabilities(), hasItem(BeamUrns.getUrn(RunnerApi.StandardPTransforms.Primitives.TO_STRING)));