Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import java.util.concurrent.Executors;

/**
* A {@link ExecutorServiceFactory} that produces cached thread pools via
* {@link Executors#newCachedThreadPool()}.
* A {@link ExecutorServiceFactory} that produces fixed thread pools via
* {@link Executors#newFixedThreadPool(int)}, with the number of threads equal to the available
* processors as provided by {@link Runtime#availableProcessors()}.
*/
class CachedThreadPoolExecutorServiceFactory
class FixedThreadPoolExecutorServiceFactory
implements DefaultValueFactory<ExecutorServiceFactory>, ExecutorServiceFactory {
private static final CachedThreadPoolExecutorServiceFactory INSTANCE =
new CachedThreadPoolExecutorServiceFactory();
private static final FixedThreadPoolExecutorServiceFactory INSTANCE =
new FixedThreadPoolExecutorServiceFactory();

@Override
public ExecutorServiceFactory create(PipelineOptions options) {
Expand All @@ -37,6 +38,6 @@ public ExecutorServiceFactory create(PipelineOptions options) {

@Override
public ExecutorService create() {
return Executors.newCachedThreadPool();
return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa
* it cannot enter a state in which it will not schedule additional pending work unless currently
* scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
*
* <p>Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which produces instances of
* <p>Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of
* {@link Executors#newCachedThreadPool()}.
*/
@JsonIgnore
@Required
@Hidden
@Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class)
@Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class)
ExecutorServiceFactory getExecutorServiceFactory();

void setExecutorServiceFactory(ExecutorServiceFactory executorService);
Expand Down