From 9a3fe3433062052138c0e5fbfc4a081b3c8e3715 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 10 May 2016 15:09:00 -0700 Subject: [PATCH 1/3] Switch to using a FixedThreadPool by default Reduces the number of active threads, which should improve scheduling behavior. Additionally improves caching behavior of ParDo evaluators due to increased reuse of DoFns. --- ...a => FixedThreadPoolExecutorServiceFactory.java} | 13 +++++++------ .../runners/direct/InProcessPipelineOptions.java | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) rename runners/direct-java/src/main/java/org/apache/beam/runners/direct/{CachedThreadPoolExecutorServiceFactory.java => FixedThreadPoolExecutorServiceFactory.java} (72%) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java similarity index 72% rename from runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java rename to runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java index 5b8e5fc6c713..74c42926b115 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java @@ -24,13 +24,14 @@ import java.util.concurrent.Executors; /** - * A {@link ExecutorServiceFactory} that produces cached thread pools via - * {@link Executors#newCachedThreadPool()}. + * A {@link ExecutorServiceFactory} that produces fixed thread pools via + * {@link Executors#newFixedThreadPool(int)}, with the number of threads equal to the available + * processors as provided by {@link Runtime#availableProcessors()}. */ -class CachedThreadPoolExecutorServiceFactory +class FixedThreadPoolExecutorServiceFactory implements DefaultValueFactory, ExecutorServiceFactory { - private static final CachedThreadPoolExecutorServiceFactory INSTANCE = - new CachedThreadPoolExecutorServiceFactory(); + private static final FixedThreadPoolExecutorServiceFactory INSTANCE = + new FixedThreadPoolExecutorServiceFactory(); @Override public ExecutorServiceFactory create(PipelineOptions options) { @@ -39,6 +40,6 @@ public ExecutorServiceFactory create(PipelineOptions options) { @Override public ExecutorService create() { - return Executors.newCachedThreadPool(); + return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java index 512b3bdb7c66..049852188953 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java @@ -43,13 +43,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa * it cannot enter a state in which it will not schedule additional pending work unless currently * scheduled work completes, as this may cause the {@link Pipeline} to cease processing. * - *

Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which produces instances of + *

Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of * {@link Executors#newCachedThreadPool()}. */ @JsonIgnore @Required @Hidden - @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class) + @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class) ExecutorServiceFactory getExecutorServiceFactory(); void setExecutorServiceFactory(ExecutorServiceFactory executorService); From 113e2573938d21040a89c02782404ccfb45fcdfd Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 10 May 2016 13:36:21 -0700 Subject: [PATCH 2/3] Enable RunnableOnService tests for the Direct Runner Explicitly remove beamTestPipelineOptions in examples. Otherwise, running the examples after a PipelineRunner with --- examples/java/pom.xml | 11 +++++++++++ runners/direct-java/pom.xml | 19 +++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 3457f71aa96f..e50b94a12602 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -46,6 +46,17 @@ maven-compiler-plugin + + org.apache.maven.plugins + maven-surefire-plugin + + + + + + + + org.apache.maven.plugins maven-dependency-plugin diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index bec9b70a8ffb..fb7ba4756c54 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -81,11 +81,26 @@ org.apache.maven.plugins maven-surefire-plugin - runnable-on-service-tests + integration-test + + test + - true + org.apache.beam.sdk.testing.RunnableOnService + none + true + + org.apache.beam:java-sdk-all + + + + [ + "--runner=org.apache.beam.runners.direct.InProcessPipelineRunner" + ] + + From e406949eea82c705343736d7fc685182ddd96921 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 12 May 2016 10:08:33 -0700 Subject: [PATCH 3/3] Stop running RunnableOnService tests in the Core SDK With the direct runner executing all of this category (in runners/direct-java), we maintain this test coverage without running these tests while building the Core SDK. Required to remove the legacy direct runner. --- sdks/java/core/pom.xml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 07fd0b1be0c6..aa7edb5a4f98 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -124,6 +124,16 @@ maven-compiler-plugin + + org.apache.maven.plugins + maven-surefire-plugin + + + org.apache.beam.sdk.testing.RunnableOnService + + + + org.apache.maven.plugins maven-dependency-plugin