Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.tron.common.es;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "common")
public class ExecutorServiceManager {

public static ExecutorService newSingleThreadExecutor(String name) {
return newSingleThreadExecutor(name, false);
}

public static ExecutorService newSingleThreadExecutor(String name, boolean isDaemon) {
return Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
}


public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) {
return newSingleThreadScheduledExecutor(name, false);
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name,
boolean isDaemon) {
return Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
}

public static void shutdownAndAwaitTermination(ExecutorService pool, String name) {
if (pool == null) {
return;
}
logger.info("Pool {} shutdown...", name);
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there are two waits of 60s, will the 60s time be too long?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

60s is the maximum value to be waited until the pool stops, if it doesn't, the pool may have a problem executing the task and needs to be fixed.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.warn("Pool {} did not terminate", name);
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
logger.info("Pool {} shutdown done", name);
}
}
20 changes: 10 additions & 10 deletions consensus/src/main/java/org/tron/consensus/dpos/DposTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL;

import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutorService;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.util.ObjectUtils;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.Sha256Hash;
Expand All @@ -34,16 +36,18 @@ public class DposTask {
@Setter
private DposService dposService;

private Thread produceThread;
private ExecutorService produceExecutor;

private final String name = "DPosMiner";

private volatile boolean isRunning = true;

public void init() {

if (!dposService.isEnable() || StringUtils.isEmpty(dposService.getMiners())) {
if (!dposService.isEnable() || ObjectUtils.isEmpty(dposService.getMiners())) {
return;
}

produceExecutor = ExecutorServiceManager.newSingleThreadExecutor(name);
Runnable runnable = () -> {
while (isRunning) {
try {
Expand All @@ -67,17 +71,13 @@ public void init() {
}
}
};
produceThread = new Thread(runnable, "DPosMiner");
produceThread.start();
produceExecutor.submit(runnable);
logger.info("DPoS task started.");
}

public void stop() {
isRunning = false;
if (produceThread != null) {
produceThread.interrupt();
}
logger.info("DPoS task stopped.");
ExecutorServiceManager.shutdownAndAwaitTermination(produceExecutor, name);
}

private State produceBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.backup.BackupManager;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.p2p.stats.TrafficStats;

Expand All @@ -29,20 +31,24 @@ public class BackupServer {

private volatile boolean shutdown = false;

private final String name = "BackupServer";
private ExecutorService executor;

@Autowired
public BackupServer(final BackupManager backupManager) {
this.backupManager = backupManager;
}

public void initServer() {
if (port > 0 && commonParameter.getBackupMembers().size() > 0) {
new Thread(() -> {
executor = ExecutorServiceManager.newSingleThreadExecutor(name);
executor.submit(() -> {
try {
start();
} catch (Exception e) {
logger.error("Start backup server failed, {}", e);
}
}, "BackupServer").start();
});
}
}

Expand Down Expand Up @@ -88,12 +94,14 @@ public void initChannel(NioDatagramChannel ch)
public void close() {
logger.info("Closing backup server...");
shutdown = true;
ExecutorServiceManager.shutdownAndAwaitTermination(executor, name);
if (channel != null) {
try {
channel.close().await(10, TimeUnit.SECONDS);
} catch (Exception e) {
logger.warn("Closing backup server failed.", e);
}
}
logger.info("Backup server closed.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public class TronLogShutdownHook extends ShutdownHookBase {
private static final Duration CHECK_SHUTDOWN_DELAY = Duration.buildByMilliseconds(100);

/**
* The check times before shutdown. default is 50
* The check times before shutdown. default is 60000/100 = 600 times.
*/
private Integer check_times = 50;
private final long check_times = 60 * 1000 / CHECK_SHUTDOWN_DELAY.getMilliseconds();

public TronLogShutdownHook() {
}
Expand Down
25 changes: 17 additions & 8 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.tron.api.GrpcAPI.TransactionInfoList;
import org.tron.common.args.GenesisBlock;
import org.tron.common.bloom.Bloom;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.logsfilter.EventPluginLoader;
import org.tron.common.logsfilter.FilterQuery;
import org.tron.common.logsfilter.capsule.BlockFilterCapsule;
Expand Down Expand Up @@ -253,6 +254,13 @@ public class Manager {
private AtomicInteger blockWaitLock = new AtomicInteger(0);
private Object transactionLock = new Object();

private ExecutorService rePushEs;
private static final String rePushEsName = "repush";
private ExecutorService triggerEs;
private static final String triggerEsName = "event-trigger";
private ExecutorService filterEs;
private static final String filterEsName = "filter";

/**
* Cycle thread to rePush Transactions
*/
Expand Down Expand Up @@ -429,14 +437,17 @@ public BlockingQueue<TransactionCapsule> getRePushTransactions() {

public void stopRePushThread() {
isRunRePushThread = false;
ExecutorServiceManager.shutdownAndAwaitTermination(rePushEs, rePushEsName);
}

public void stopRePushTriggerThread() {
isRunTriggerCapsuleProcessThread = false;
ExecutorServiceManager.shutdownAndAwaitTermination(triggerEs, triggerEsName);
}

public void stopFilterProcessThread() {
isRunFilterProcessThread = false;
ExecutorServiceManager.shutdownAndAwaitTermination(filterEs, filterEsName);
}

@PostConstruct
Expand Down Expand Up @@ -524,21 +535,19 @@ public void init() {
revokingStore.enable();
validateSignService = Executors
.newFixedThreadPool(Args.getInstance().getValidateSignThreadNum());
Thread rePushThread = new Thread(rePushLoop);
rePushThread.setDaemon(true);
rePushThread.start();
rePushEs = ExecutorServiceManager.newSingleThreadExecutor(rePushEsName, true);
rePushEs.submit(rePushLoop);
// add contract event listener for subscribing
if (Args.getInstance().isEventSubscribe()) {
startEventSubscribing();
Thread triggerCapsuleProcessThread = new Thread(triggerCapsuleProcessLoop);
triggerCapsuleProcessThread.setDaemon(true);
triggerCapsuleProcessThread.start();
triggerEs = ExecutorServiceManager.newSingleThreadExecutor(triggerEsName, true);
triggerEs.submit(triggerCapsuleProcessLoop);
}

// start json rpc filter process
if (CommonParameter.getInstance().isJsonRpcFilterEnabled()) {
Thread filterProcessThread = new Thread(filterProcessLoop);
filterProcessThread.start();
filterEs = ExecutorServiceManager.newSingleThreadExecutor(filterEsName);
filterEs.submit(filterProcessLoop);
}

//initStoreFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.JsonUtil;
Expand All @@ -27,28 +27,22 @@ public class NodePersistService {
private final boolean isNodePersist = CommonParameter.getInstance().isNodeDiscoveryPersist();
@Autowired
private CommonStore commonStore;
private Timer nodePersistTaskTimer;

private ScheduledExecutorService nodePersistExecutor;

private final String name = "NodePersistTask";

public void init() {
if (isNodePersist) {
nodePersistTaskTimer = new Timer("NodePersistTaskTimer");
nodePersistTaskTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
dbWrite();
}
}, DB_COMMIT_RATE, DB_COMMIT_RATE);
nodePersistExecutor = ExecutorServiceManager.newSingleThreadScheduledExecutor(name);
nodePersistExecutor.scheduleAtFixedRate(this::dbWrite, DB_COMMIT_RATE, DB_COMMIT_RATE,
TimeUnit.MILLISECONDS);
}
}

public void close() {
if (Objects.isNull(nodePersistTaskTimer)) {
return;
}
try {
nodePersistTaskTimer.cancel();
} catch (Exception e) {
logger.error("Close nodePersistTaskTimer failed", e);
if (isNodePersist) {
ExecutorServiceManager.shutdownAndAwaitTermination(nodePersistExecutor, name);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.tron.common.backup;

import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.tron.common.backup.socket.BackupServer;
import org.tron.common.parameter.CommonParameter;
import org.tron.core.Constant;
import org.tron.core.config.args.Args;


public class BackupServerTest {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private BackupServer backupServer;

@Before
public void setUp() throws Exception {
Args.setParam(new String[]{"-d", temporaryFolder.newFolder().toString()}, Constant.TEST_CONF);
CommonParameter.getInstance().setBackupPort(80);
List<String> members = new ArrayList<>();
members.add("127.0.0.2");
CommonParameter.getInstance().setBackupMembers(members);
BackupManager backupManager = new BackupManager();
backupManager.init();
backupServer = new BackupServer(backupManager);
}

@After
public void tearDown() {
backupServer.close();
Args.clearParam();
}

@Test
public void test() throws InterruptedException {
backupServer.initServer();
}
}