From b79a01ea39cb3b456510f24aa676d1d6caee31d8 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 4 Feb 2022 10:07:09 -0800 Subject: [PATCH 1/3] [BEAM-13015] Add state caching benchmark and move benchmarks to their own module. This simplifies adding unit tests and decreases the clutter in the sdks/java/harness module. --- .../beam/gradle/BeamModulePlugin.groovy | 65 ++++-- sdks/java/harness/build.gradle | 19 -- sdks/java/harness/jmh/build.gradle | 51 ++++ .../harness/jmh}/ProcessBundleBenchmark.java | 220 +++++++++++++++++- .../logging/BeamFnLoggingClientBenchmark.java | 4 +- .../fn/harness/jmh}/logging/package-info.java | 2 +- .../beam/fn/harness/jmh}/package-info.java | 2 +- .../jmh/ProcessBundleBenchmarkTest.java | 56 +++++ .../BeamFnLoggingClientBenchmarkTest.java | 55 +++++ settings.gradle.kts | 1 + 10 files changed, 430 insertions(+), 45 deletions(-) create mode 100644 sdks/java/harness/jmh/build.gradle rename sdks/java/harness/{src/jmh/java/org/apache/beam/fn/harness => jmh/src/main/java/org/apache/beam/fn/harness/jmh}/ProcessBundleBenchmark.java (57%) rename sdks/java/harness/{src/jmh/java/org/apache/beam/fn/harness => jmh/src/main/java/org/apache/beam/fn/harness/jmh}/logging/BeamFnLoggingClientBenchmark.java (97%) rename sdks/java/harness/{src/jmh/java/org/apache/beam/fn/harness => jmh/src/main/java/org/apache/beam/fn/harness/jmh}/logging/package-info.java (94%) rename sdks/java/harness/{src/jmh/java/org/apache/beam/fn/harness => jmh/src/main/java/org/apache/beam/fn/harness/jmh}/package-info.java (95%) create mode 100644 sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java create mode 100644 sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmarkTest.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 960a89e2cb16..2fea16119548 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -130,13 +130,15 @@ class BeamModulePlugin implements Plugin { boolean validateShadowJar = true /** - * Controls whether the 'jmh' source set is enabled for JMH benchmarks. + * Controls whether 'jmh' specific configuration is enabled to build a JMH + * focused module. * - * Add additional dependencies to the jmhCompile and jmhRuntime dependency - * sets. + * Add additional dependencies to the implementation configuration. * * Note that the JMH annotation processor is enabled by default and that * a 'jmh' task is created which executes JMH. + * + * Publishing is not allowed for JMH enabled projects. */ boolean enableJmh = false @@ -480,7 +482,7 @@ class BeamModulePlugin implements Plugin { def spotbugs_version = "4.0.6" def testcontainers_version = "1.15.1" def arrow_version = "5.0.0" - def jmh_version = "1.32" + def jmh_version = "1.34" // A map of maps containing common libraries used per language. To use: // dependencies { @@ -793,6 +795,11 @@ class BeamModulePlugin implements Plugin { // Use the implicit it parameter of the closure to handle zero argument or one argument map calls. JavaNatureConfiguration configuration = it ? it as JavaNatureConfiguration : new JavaNatureConfiguration() + // Validate configuration + if (configuration.enableJmh && configuration.publish) { + throw new GradleException("Publishing of a benchmark project is not allowed. Benchmark projects are not meant to be consumed as artifacts for end users."); + } + if (configuration.archivesBaseName) { project.archivesBaseName = configuration.archivesBaseName } @@ -1325,25 +1332,12 @@ class BeamModulePlugin implements Plugin { } if (configuration.enableJmh) { - // We specifically use a separate source set for JMH to ensure that it does not - // become a required artifact - project.sourceSets { - jmh { - java { - srcDir "src/jmh/java" - } - resources { - srcDir "src/jmh/resources" - } - } - } - project.dependencies { - jmhAnnotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmh_version" - jmhImplementation "org.openjdk.jmh:jmh-core:$jmh_version" + annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmh_version" + implementation "org.openjdk.jmh:jmh-core:$jmh_version" } - project.compileJmhJava { + project.compileJava { // Always exclude checkerframework on JMH generated code. It's slow, // and it often raises erroneous error because we don't have checker // annotations for generated code and test libraries. @@ -1355,9 +1349,9 @@ class BeamModulePlugin implements Plugin { } } - project.task("jmh", type: JavaExec, dependsOn: project.jmhClasses, { + project.task("jmh", type: JavaExec, dependsOn: project.classes, { mainClass = "org.openjdk.jmh.Main" - classpath = project.sourceSets.jmh.compileClasspath + project.sourceSets.jmh.runtimeClasspath + classpath = project.sourceSets.main.compileClasspath + project.sourceSets.main.runtimeClasspath // For a list of arguments, see // https://github.com/guozheng/jmh-tutorial/blob/master/README.md // @@ -1372,7 +1366,34 @@ class BeamModulePlugin implements Plugin { // Enable connecting a debugger by disabling forking (uncomment below) // Useful for debugging via an IDE such as Intellij // args '-f0' + // Specify -Pbenchmark=ProcessBundleBenchmark.testTinyBundle on the command + // line to enable running a single benchmark. + + // Enable Google Cloud Profiler and upload the benchmarks to GCP. + if (project.hasProperty("benchmark")) { + args project.getProperty("benchmark") + // Add JVM arguments allowing one to additionally use Google's Java Profiler + // Agent: (see https://cloud.google.com/profiler/docs/profiling-java#installing-profiler for instructions on how to install) + if (project.file("/opt/cprof/profiler_java_agent.so").exists()) { + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def userName = System.getProperty("user.name").toLowerCase().replaceAll(" ", "_") + jvmArgs '-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" + project.getProperty("benchmark").toLowerCase() + '_' + System.currentTimeMillis() + ',-cprof_project_id=' + gcpProject + ',-cprof_zone_name=us-central1-a' + } + } + args '-foe true' + }) + + // Single shot of JMH benchmarks ensures that they can execute. + // + // Note that these tests will fail on JVMs that JMH doesn't support. + project.task("jmhTest", type: JavaExec, dependsOn: project.classes, { + mainClass = "org.openjdk.jmh.Main" + classpath = project.sourceSets.main.compileClasspath + project.sourceSets.main.runtimeClasspath + + args '-bm ss' + args '-foe true' }) + project.check.dependsOn("jmhTest") } project.ext.includeInJavaBom = configuration.publish diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index 91f8a39f5eaa..818c9a084755 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -33,7 +33,6 @@ applyJavaNature( ], automaticModuleName: 'org.apache.beam.fn.harness', validateShadowJar: false, - enableJmh: true, testShadowJar: true, shadowClosure: // Create an uber jar without repackaging for the SDK harness @@ -76,22 +75,4 @@ dependencies { testImplementation project(":runners:core-construction-java") testImplementation project(path: ":sdks:java:fn-execution", configuration: "testRuntimeMigration") shadowTestRuntimeClasspath library.java.slf4j_jdk14 - jmhImplementation project(path: ":sdks:java:harness", configuration: "shadow") - jmhImplementation project(":runners:java-fn-execution") - jmhRuntimeOnly library.java.slf4j_jdk14 -} - -jmh { - // Specify -Pbenchmark=ProcessBundleBenchmark.testTinyBundle on the command - // line to enable running a single benchmark. - if (project.hasProperty("benchmark")) { - args project.getProperty("benchmark") - // Add JVM arguments allowing one to additionally use Google's Java Profiler - // Agent: (see https://cloud.google.com/profiler/docs/profiling-java#installing-profiler for instructions on how to install) - if (file("/opt/cprof/profiler_java_agent.so").exists()) { - def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' - def userName = System.getProperty("user.name").toLowerCase().replaceAll(" ", "_") - jvmArgs '-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" + project.getProperty("benchmark").toLowerCase() + '_' + System.currentTimeMillis() + ',-cprof_project_id=' + gcpProject + ',-cprof_zone_name=us-central1-a' - } - } } diff --git a/sdks/java/harness/jmh/build.gradle b/sdks/java/harness/jmh/build.gradle new file mode 100644 index 000000000000..4d50c717b555 --- /dev/null +++ b/sdks/java/harness/jmh/build.gradle @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } + +applyJavaNature( + automaticModuleName: 'org.apache.beam.fn.harness.jmh', + enableJmh: true, + publish: false) + +description = "Apache Beam :: SDKs :: Java :: Harness :: JMH" +ext.summary = "This contains JMH benchmarks for the SDK Fn Harness for Beam Java" + +configurations { + jammAgent +} + +dependencies { + implementation project(path: ":sdks:java:harness", configuration: "shadow") + implementation project(":runners:java-fn-execution") + runtimeOnly library.java.slf4j_jdk14 + jammAgent library.java.jamm +} + +jmh { + configurations.jammAgent.resolvedConfiguration.files.each { + jvmArgs '-javaagent:' + it + } +} + +jmhTest { + configurations.jammAgent.resolvedConfiguration.files.each { + jvmArgs '-javaagent:' + it + } +} + diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java similarity index 57% rename from sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java rename to sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java index e44a704b4851..7612be28e698 100644 --- a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java @@ -15,23 +15,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.fn.harness; +package org.apache.beam.fn.harness.jmh; import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import static org.junit.Assert.assertEquals; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.FnHarness; import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest.CacheToken; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; @@ -53,6 +64,7 @@ import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; import org.apache.beam.runners.fnexecution.logging.LogWriter; import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; @@ -66,12 +78,18 @@ import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.openjdk.jmh.annotations.Benchmark; @@ -217,6 +235,11 @@ public TrivialTransform() { .apply("gbk", GroupByKey.create()); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + // The fuser will create one stage (SDK responsible portion in []): + // (Impulse + [create + len + addKeys] + GBK write) + // + // We use the one stage containing the DoFns and run a benchmark expecting the SDK + // to accept byte[] and output KV. FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto); checkState(fused.getFusedStages().size() == 1, "Expected exactly one fused stage"); ExecutableStage stage = fused.getFusedStages().iterator().next(); @@ -283,4 +306,199 @@ public void testLargeBundle(TrivialTransform trivialTransform) throws Exception } assertEquals(3_000, outputValuesCount.getAndSet(0)); } + + @State(Scope.Benchmark) + public static class StatefulTransform extends SdkHarness { + final BundleProcessor processor; + final ExecutableProcessBundleDescriptor descriptor; + final StateRequestHandler nonCachingStateRequestHandler; + final StateRequestHandler cachingStateRequestHandler; + + @SuppressWarnings({ + "unused" // TODO(BEAM-13271): Remove when new version of errorprone is released (2.11.0) + }) + private static class StatefulOutputZeroOneTwo + extends DoFn, KV> { + private static final String STATE_ID = "bagState"; + + @StateId(STATE_ID) + private final StateSpec> bagStateSpec = StateSpecs.bag(StringUtf8Coder.of()); + + @ProcessElement + public void process(ProcessContext ctxt, @StateId(STATE_ID) BagState state) { + int size = Iterables.size(state.read()); + if (size == 3) { + for (String value : state.read()) { + ctxt.output(KV.of(ctxt.element().getKey(), value)); + } + state.clear(); + } else { + state.add(ctxt.element().getValue()); + } + } + } + + private static class ToKeyAndValueDoFn extends DoFn> { + @ProcessElement + public void process(ProcessContext ctxt) { + ctxt.output(KV.of("key", "value")); + } + } + + public StatefulTransform() { + try { + Pipeline p = Pipeline.create(); + p.apply("impulse", Impulse.create()) + .apply("toKeyAndValue", ParDo.of(new ToKeyAndValueDoFn())) + .apply("stateful", ParDo.of(new StatefulOutputZeroOneTwo())) + // Force the output to be materialized + .apply("gbk", GroupByKey.create()); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + // The fuser will break up the pipeline into two stages (SDK responsible portion in []): + // (Impulse + [toKeyAndValue]) -> ([stateful] + GBK write) + // + // We pull out the stage containing the stateful DoFn and run a benchmark expecting the SDK + // to accept KV and output KV. + FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto); + checkState(fused.getFusedStages().size() == 2, "Expected exactly two fused stages"); + ExecutableStage stage = null; + for (ExecutableStage value : fused.getFusedStages()) { + if (!value.getUserStates().isEmpty()) { + stage = value; + break; + } + } + if (stage == null) { + throw new IllegalStateException("Stage with stateful DoFn not found."); + } + + this.descriptor = + ProcessBundleDescriptors.fromExecutableStage( + "my_stage", + stage, + dataServer.getApiServiceDescriptor(), + stateServer.getApiServiceDescriptor()); + + this.processor = + controlClient.getProcessor( + descriptor.getProcessBundleDescriptor(), + descriptor.getRemoteInputDestinations(), + stateDelegator); + this.nonCachingStateRequestHandler = new InMemoryBagUserStateHandler(); + + List cacheTokens = new ArrayList<>(); + cacheTokens.add( + CacheToken.newBuilder() + .setUserState(CacheToken.UserState.newBuilder()) + .setToken(ByteString.copyFromUtf8("cacheMe")) + .build()); + this.cachingStateRequestHandler = + new InMemoryBagUserStateHandler() { + @Override + public Iterable getCacheTokens() { + return cacheTokens; + } + }; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private static class InMemoryBagUserStateHandler implements StateRequestHandler { + private final Map bagState = new ConcurrentHashMap<>(); + + @Override + public CompletionStage handle(StateRequest request) throws Exception { + if (!request.getStateKey().hasBagUserState()) { + throw new IllegalStateException( + "Unknown state key type " + request.getStateKey().getTypeCase()); + } + StateResponse.Builder response = StateResponse.newBuilder(); + StateKey.BagUserState stateKey = request.getStateKey().getBagUserState(); + switch (request.getRequestCase()) { + case APPEND: + { + ByteString data = + bagState.computeIfAbsent(stateKey.getKey(), (unused) -> ByteString.EMPTY); + bagState.put(stateKey.getKey(), data.concat(request.getAppend().getData())); + response.getAppendBuilder().build(); + break; + } + case CLEAR: + { + bagState.remove(stateKey.getKey()); + response.getClearBuilder().build(); + break; + } + case GET: + { + ByteString data = + bagState.computeIfAbsent(stateKey.getKey(), (unused) -> ByteString.EMPTY); + response.getGetBuilder().setData(data).build(); + break; + } + default: + throw new IllegalStateException("Unknown request type " + request.getRequestCase()); + } + return CompletableFuture.completedFuture(response); + } + } + + @Benchmark + @Threads(16) // Use several threads since we expect contention during bundle processing. + public void testStateWithoutCaching(StatefulTransform statefulTransform) throws Exception { + testState(statefulTransform, statefulTransform.nonCachingStateRequestHandler); + } + + @Benchmark + @Threads(16) // Use several threads since we expect contention during bundle processing. + public void testStateWithCaching(StatefulTransform statefulTransform) throws Exception { + testState(statefulTransform, statefulTransform.cachingStateRequestHandler); + } + + private static void testState( + StatefulTransform statefulTransform, StateRequestHandler stateRequestHandler) + throws Exception { + Map>> remoteOutputCoders = + statefulTransform.descriptor.getRemoteOutputCoders(); + Map> outputReceivers = new HashMap<>(); + AtomicInteger outputValuesCount = new AtomicInteger(); + for (Entry>> remoteOutputCoder : + remoteOutputCoders.entrySet()) { + outputReceivers.put( + remoteOutputCoder.getKey(), + RemoteOutputReceiver.of( + (Coder) remoteOutputCoder.getValue(), + (FnDataReceiver>) + (WindowedValue value) -> outputValuesCount.incrementAndGet())); + } + String key = Strings.padStart(Long.toHexString(Thread.currentThread().getId()), 16, '0'); + try (RemoteBundle bundle = + statefulTransform.processor.newBundle( + outputReceivers, stateRequestHandler, BundleProgressHandler.ignored())) { + Iterables.getOnlyElement(bundle.getInputReceivers().values()) + .accept(valueInGlobalWindow(KV.of(key, "zero"))); + } + try (RemoteBundle bundle = + statefulTransform.processor.newBundle( + outputReceivers, stateRequestHandler, BundleProgressHandler.ignored())) { + Iterables.getOnlyElement(bundle.getInputReceivers().values()) + .accept(valueInGlobalWindow(KV.of(key, "one"))); + } + try (RemoteBundle bundle = + statefulTransform.processor.newBundle( + outputReceivers, stateRequestHandler, BundleProgressHandler.ignored())) { + Iterables.getOnlyElement(bundle.getInputReceivers().values()) + .accept(valueInGlobalWindow(KV.of(key, "two"))); + } + try (RemoteBundle bundle = + statefulTransform.processor.newBundle( + outputReceivers, stateRequestHandler, BundleProgressHandler.ignored())) { + Iterables.getOnlyElement(bundle.getInputReceivers().values()) + .accept(valueInGlobalWindow(KV.of(key, "flush"))); + } + assertEquals(3, outputValuesCount.getAndSet(0)); + } } diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java similarity index 97% rename from sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java rename to sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java index 6ed382c7db32..ca9648b4b1d5 100644 --- a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java @@ -15,13 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.fn.harness.logging; +package org.apache.beam.fn.harness.jmh.logging; import java.io.Closeable; import java.util.HashMap; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.fn.harness.logging.BeamFnLoggingClient; +import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc; import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/package-info.java similarity index 94% rename from sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java rename to sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/package-info.java index 304238ed26ad..656592f3ebbd 100644 --- a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/package-info.java @@ -17,4 +17,4 @@ */ /** Benchmarks for logging. */ -package org.apache.beam.fn.harness.logging; +package org.apache.beam.fn.harness.jmh.logging; diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/package-info.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/package-info.java similarity index 95% rename from sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/package-info.java rename to sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/package-info.java index d61e5f5cca20..56e401ff1ddd 100644 --- a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/package-info.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/package-info.java @@ -17,4 +17,4 @@ */ /** Benchmarks for the SDK harness. */ -package org.apache.beam.fn.harness; +package org.apache.beam.fn.harness.jmh; diff --git a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java new file mode 100644 index 000000000000..e5541f2b1717 --- /dev/null +++ b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.jmh; + +import org.apache.beam.fn.harness.jmh.ProcessBundleBenchmark.StatefulTransform; +import org.apache.beam.fn.harness.jmh.ProcessBundleBenchmark.TrivialTransform; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ProcessBundleBenchmark}. */ +@RunWith(JUnit4.class) +public class ProcessBundleBenchmarkTest { + @Test + public void testTinyBundle() throws Exception { + TrivialTransform transform = new TrivialTransform(); + new ProcessBundleBenchmark().testTinyBundle(transform); + transform.tearDown(); + } + + @Test + public void testLargeBundle() throws Exception { + TrivialTransform transform = new TrivialTransform(); + new ProcessBundleBenchmark().testLargeBundle(transform); + transform.tearDown(); + } + + @Test + public void testStateWithoutCaching() throws Exception { + StatefulTransform transform = new StatefulTransform(); + new ProcessBundleBenchmark().testStateWithoutCaching(transform); + transform.tearDown(); + } + + @Test + public void testStateWithCaching() throws Exception { + StatefulTransform transform = new StatefulTransform(); + new ProcessBundleBenchmark().testStateWithCaching(transform); + transform.tearDown(); + } +} diff --git a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmarkTest.java b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmarkTest.java new file mode 100644 index 000000000000..6189c9275680 --- /dev/null +++ b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmarkTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.jmh.logging; + +import org.apache.beam.fn.harness.jmh.logging.BeamFnLoggingClientBenchmark.ManageExecutionState; +import org.apache.beam.fn.harness.jmh.logging.BeamFnLoggingClientBenchmark.ManyExpectedCallsLoggingClientAndService; +import org.apache.beam.fn.harness.jmh.logging.BeamFnLoggingClientBenchmark.ZeroExpectedCallsLoggingClientAndService; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BeamFnLoggingClientBenchmark}. */ +@RunWith(JUnit4.class) +public class BeamFnLoggingClientBenchmarkTest { + @Test + public void testLogging() throws Exception { + ManyExpectedCallsLoggingClientAndService service = + new ManyExpectedCallsLoggingClientAndService(); + new BeamFnLoggingClientBenchmark().testLogging(service); + service.tearDown(); + } + + @Test + public void testLoggingWithAllOptionalParameters() throws Exception { + ManyExpectedCallsLoggingClientAndService service = + new ManyExpectedCallsLoggingClientAndService(); + ManageExecutionState state = new ManageExecutionState(); + new BeamFnLoggingClientBenchmark().testLoggingWithAllOptionalParameters(service, state); + state.tearDown(); + service.tearDown(); + } + + @Test + public void testSkippedLogging() throws Exception { + ZeroExpectedCallsLoggingClientAndService service = + new ZeroExpectedCallsLoggingClientAndService(); + new BeamFnLoggingClientBenchmark().testSkippedLogging(service); + service.tearDown(); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index c97165121904..ec64891fac3c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -140,6 +140,7 @@ include(":sdks:java:extensions:sql:udf-test-provider") include(":sdks:java:extensions:zetasketch") include(":sdks:java:fn-execution") include(":sdks:java:harness") +include(":sdks:java:harness:jmh") include(":sdks:java:io:amazon-web-services") include(":sdks:java:io:amazon-web-services2") include(":sdks:java:io:amqp") From c9e43c71efa37c4125cafee43ca7457bc75bebf4 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 4 Feb 2022 10:47:33 -0800 Subject: [PATCH 2/3] Make jmhTest run only one iteration without forking to speed it up. Also clean-up comments around args and fix classpath to only be runtime classpath. --- .../beam/gradle/BeamModulePlugin.groovy | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 2fea16119548..9a3b85208582 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1351,21 +1351,16 @@ class BeamModulePlugin implements Plugin { project.task("jmh", type: JavaExec, dependsOn: project.classes, { mainClass = "org.openjdk.jmh.Main" - classpath = project.sourceSets.main.compileClasspath + project.sourceSets.main.runtimeClasspath + classpath = project.sourceSets.main.runtimeClasspath // For a list of arguments, see // https://github.com/guozheng/jmh-tutorial/blob/master/README.md // - // Filter for a specific benchmark to run (uncomment below) - // Note that multiple regex are supported each as a separate argument. - // args 'BeamFnLoggingClientBenchmark.testLoggingWithAllOptionalParameters' - // args 'additional regexp...' - // - // Enumerate available benchmarks and exit (uncomment below) + // Enumerate available benchmarks and exit (uncomment below and disable other args) // args '-l' // - // Enable connecting a debugger by disabling forking (uncomment below) + // Enable connecting a debugger by disabling forking (uncomment below and disable other args) // Useful for debugging via an IDE such as Intellij - // args '-f0' + // args '-f=0' // Specify -Pbenchmark=ProcessBundleBenchmark.testTinyBundle on the command // line to enable running a single benchmark. @@ -1379,8 +1374,13 @@ class BeamModulePlugin implements Plugin { def userName = System.getProperty("user.name").toLowerCase().replaceAll(" ", "_") jvmArgs '-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" + project.getProperty("benchmark").toLowerCase() + '_' + System.currentTimeMillis() + ',-cprof_project_id=' + gcpProject + ',-cprof_zone_name=us-central1-a' } + } else { + // We filter for only Apache Beam benchmarks to ensure that we aren't + // running benchmarks that may have been packaged from another library + // that ends up on the runtime classpath. + args 'org.apache.beam' } - args '-foe true' + args '-foe=true' }) // Single shot of JMH benchmarks ensures that they can execute. @@ -1388,10 +1388,17 @@ class BeamModulePlugin implements Plugin { // Note that these tests will fail on JVMs that JMH doesn't support. project.task("jmhTest", type: JavaExec, dependsOn: project.classes, { mainClass = "org.openjdk.jmh.Main" - classpath = project.sourceSets.main.compileClasspath + project.sourceSets.main.runtimeClasspath - - args '-bm ss' - args '-foe true' + classpath = project.sourceSets.main.runtimeClasspath + + // We filter for only Apache Beam benchmarks to ensure that we aren't + // running benchmarks that may have been packaged from another library + // that ends up on the runtime classpath. + args 'org.apache.beam' + args '-bm=ss' + args '-i=1' + args '-f=0' + args '-wf=0' + args '-foe=true' }) project.check.dependsOn("jmhTest") } From f64f5051ed2a7181e87f88b49a5b6ef349a27f95 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 4 Feb 2022 15:06:30 -0800 Subject: [PATCH 3/3] increase timeout to 60 seconds --- .../org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java index 7612be28e698..6017e5f0277d 100644 --- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java @@ -172,7 +172,7 @@ public void log(LogEntry entry) { } }); InstructionRequestHandler controlClient = - clientPool.getSource().take(WORKER_ID, java.time.Duration.ofSeconds(2)); + clientPool.getSource().take(WORKER_ID, java.time.Duration.ofSeconds(60)); this.controlClient = SdkHarnessClient.usingFnApiClient(controlClient, dataServer.getService()); } catch (Exception e) {