diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index 5472a24be7f4..ee1faf8917d9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -18,8 +18,11 @@ package org.apache.hadoop.ozone.container.replication; import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigType; @@ -62,6 +65,8 @@ public class ReplicationServer { private int port; private final ContainerImporter importer; + private ThreadPoolExecutor executor; + public ReplicationServer(ContainerController controller, ReplicationConfig replicationConfig, SecurityConfig secConf, CertificateClient caClient, ContainerImporter importer) { @@ -70,6 +75,18 @@ public ReplicationServer(ContainerController controller, this.controller = controller; this.importer = importer; this.port = replicationConfig.getPort(); + + int replicationServerWorkers = + replicationConfig.getReplicationMaxStreams(); + this.executor = + new ThreadPoolExecutor(replicationServerWorkers, + replicationServerWorkers, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ReplicationContainerReader-%d") + .build()); + init(); } @@ -79,7 +96,8 @@ public void init() { .addService(ServerInterceptors.intercept(new GrpcReplicationService( new OnDemandContainerReplicationSource(controller), importer - ), new GrpcServerInterceptor())); + ), new GrpcServerInterceptor())) + .executor(executor); if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) { try { @@ -112,6 +130,8 @@ public void start() throws IOException { public void stop() { try { + executor.shutdown(); + executor.awaitTermination(5L, TimeUnit.SECONDS); server.shutdown().awaitTermination(10L, TimeUnit.SECONDS); } catch (InterruptedException ex) { LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName());