From e20418b4f33fc1aeb3da3597e928ca69bc995ecd Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 10 May 2016 15:09:00 -0700 Subject: [PATCH] Switch to using a FixedThreadPool by default Reduces the number of active threads, which should improve scheduling behavior. Additionally improves caching behavior of ParDo evaluators due to increased reuse of DoFns. --- ...a => FixedThreadPoolExecutorServiceFactory.java} | 13 +++++++------ .../runners/inprocess/InProcessPipelineOptions.java | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) rename sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/{CachedThreadPoolExecutorServiceFactory.java => FixedThreadPoolExecutorServiceFactory.java} (69%) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FixedThreadPoolExecutorServiceFactory.java similarity index 69% rename from sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java rename to sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FixedThreadPoolExecutorServiceFactory.java index 3350d2b4d5..d71b1b0ae8 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FixedThreadPoolExecutorServiceFactory.java @@ -22,13 +22,14 @@ import java.util.concurrent.Executors; /** - * A {@link ExecutorServiceFactory} that produces cached thread pools via - * {@link Executors#newCachedThreadPool()}. + * A {@link ExecutorServiceFactory} that produces fixed thread pools via + * {@link Executors#newFixedThreadPool(int)}, with the number of threads equal to the available + * processors as provided by {@link Runtime#availableProcessors()}. */ -class CachedThreadPoolExecutorServiceFactory +class FixedThreadPoolExecutorServiceFactory implements DefaultValueFactory, ExecutorServiceFactory { - private static final CachedThreadPoolExecutorServiceFactory INSTANCE = - new CachedThreadPoolExecutorServiceFactory(); + private static final FixedThreadPoolExecutorServiceFactory INSTANCE = + new FixedThreadPoolExecutorServiceFactory(); @Override public ExecutorServiceFactory create(PipelineOptions options) { @@ -37,6 +38,6 @@ public ExecutorServiceFactory create(PipelineOptions options) { @Override public ExecutorService create() { - return Executors.newCachedThreadPool(); + return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java index 9c10510bf9..e904ec46ce 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java @@ -41,13 +41,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa * it cannot enter a state in which it will not schedule additional pending work unless currently * scheduled work completes, as this may cause the {@link Pipeline} to cease processing. * - *

Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which produces instances of + *

Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of * {@link Executors#newCachedThreadPool()}. */ @JsonIgnore @Required @Hidden - @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class) + @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class) ExecutorServiceFactory getExecutorServiceFactory(); void setExecutorServiceFactory(ExecutorServiceFactory executorService);