diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 3457f71aa96f..e50b94a12602 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -46,6 +46,17 @@ maven-compiler-plugin + + org.apache.maven.plugins + maven-surefire-plugin + + + + + + + + org.apache.maven.plugins maven-dependency-plugin diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index bec9b70a8ffb..fb7ba4756c54 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -81,11 +81,26 @@ org.apache.maven.plugins maven-surefire-plugin - runnable-on-service-tests + integration-test + + test + - true + org.apache.beam.sdk.testing.RunnableOnService + none + true + + org.apache.beam:java-sdk-all + + + + [ + "--runner=org.apache.beam.runners.direct.InProcessPipelineRunner" + ] + + diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java similarity index 72% rename from runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java rename to runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java index 5b8e5fc6c713..74c42926b115 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java @@ -24,13 +24,14 @@ import java.util.concurrent.Executors; /** - * A {@link ExecutorServiceFactory} that produces cached thread pools via - * {@link Executors#newCachedThreadPool()}. + * A {@link ExecutorServiceFactory} that produces fixed thread pools via + * {@link Executors#newFixedThreadPool(int)}, with the number of threads equal to the available + * processors as provided by {@link Runtime#availableProcessors()}. */ -class CachedThreadPoolExecutorServiceFactory +class FixedThreadPoolExecutorServiceFactory implements DefaultValueFactory, ExecutorServiceFactory { - private static final CachedThreadPoolExecutorServiceFactory INSTANCE = - new CachedThreadPoolExecutorServiceFactory(); + private static final FixedThreadPoolExecutorServiceFactory INSTANCE = + new FixedThreadPoolExecutorServiceFactory(); @Override public ExecutorServiceFactory create(PipelineOptions options) { @@ -39,6 +40,6 @@ public ExecutorServiceFactory create(PipelineOptions options) { @Override public ExecutorService create() { - return Executors.newCachedThreadPool(); + return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java index 512b3bdb7c66..049852188953 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java @@ -43,13 +43,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa * it cannot enter a state in which it will not schedule additional pending work unless currently * scheduled work completes, as this may cause the {@link Pipeline} to cease processing. * - *

Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which produces instances of + *

Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of * {@link Executors#newCachedThreadPool()}. */ @JsonIgnore @Required @Hidden - @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class) + @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class) ExecutorServiceFactory getExecutorServiceFactory(); void setExecutorServiceFactory(ExecutorServiceFactory executorService); diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 07fd0b1be0c6..aa7edb5a4f98 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -124,6 +124,16 @@ maven-compiler-plugin + + org.apache.maven.plugins + maven-surefire-plugin + + + org.apache.beam.sdk.testing.RunnableOnService + + + + org.apache.maven.plugins maven-dependency-plugin