diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index cd1a61c01fe..cc3e5d0c0e9 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -1033,23 +1033,6 @@ public void eraseBlock() { } } - public void pushVerifiedBlock(BlockCapsule block) throws ContractValidateException, - ContractExeException, ValidateSignatureException, AccountResourceInsufficientException, - TransactionExpirationException, TooBigTransactionException, DupTransactionException, - TaposException, ValidateScheduleException, ReceiptCheckErrException, - VMIllegalException, TooBigTransactionResultException, UnLinkedBlockException, - NonCommonBlockException, BadNumberBlockException, BadBlockException, ZksnarkException, - EventBloomException { - block.generatedByMyself = true; - long start = System.currentTimeMillis(); - pushBlock(block); - logger.info("Push block cost: {} ms, blockNum: {}, blockHash: {}, trx count: {}.", - System.currentTimeMillis() - start, - block.getNum(), - block.getBlockId(), - block.getTransactions().size()); - } - private void applyBlock(BlockCapsule block) throws ContractValidateException, ContractExeException, ValidateSignatureException, AccountResourceInsufficientException, TransactionExpirationException, TooBigTransactionException, DupTransactionException, diff --git a/framework/src/main/java/org/tron/core/net/TronNetDelegate.java b/framework/src/main/java/org/tron/core/net/TronNetDelegate.java index 100bad179bf..4fcdfc5e667 100644 --- a/framework/src/main/java/org/tron/core/net/TronNetDelegate.java +++ b/framework/src/main/java/org/tron/core/net/TronNetDelegate.java @@ -233,6 +233,17 @@ public Message getData(Sha256Hash hash, InventoryType type) throws P2pException } } + public void pushVerifiedBlock(BlockCapsule block) throws P2pException { + block.generatedByMyself = true; + long start = System.currentTimeMillis(); + processBlock(block, true); + logger.info("Push block cost: {} ms, blockNum: {}, blockHash: {}, trx count: {}.", + System.currentTimeMillis() - start, + block.getNum(), + block.getBlockId(), + block.getTransactions().size()); + } + public void processBlock(BlockCapsule block, boolean isSync) throws P2pException { if (!hitDown && dbManager.getLatestSolidityNumShutDown() > 0 && dbManager.getLatestSolidityNumShutDown() == dbManager.getDynamicPropertiesStore() diff --git a/framework/src/main/java/org/tron/program/FullNode.java b/framework/src/main/java/org/tron/program/FullNode.java index 95257d77f8e..a7d204de025 100644 --- a/framework/src/main/java/org/tron/program/FullNode.java +++ b/framework/src/main/java/org/tron/program/FullNode.java @@ -2,6 +2,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.util.ObjectUtils; import org.tron.common.application.Application; import org.tron.common.application.ApplicationFactory; import org.tron.common.application.TronApplicationContext; @@ -28,19 +29,24 @@ public static void main(String[] args) { LogService.load(parameter.getLogbackPath()); - if (parameter.isSolidityNode()) { - SolidityNode.start(); - return; - } if (parameter.isKeystoreFactory()) { KeystoreFactory.start(); return; } - logger.info("Full node running."); - if (Args.getInstance().isDebug()) { - logger.info("in debug mode, it won't check energy time"); + if (parameter.isSolidityNode()) { + logger.info("Solidity node is running."); + if (ObjectUtils.isEmpty(parameter.getTrustNodeAddr())) { + throw new TronError(new IllegalArgumentException("Trust node is not set."), + TronError.ErrCode.SOLID_NODE_INIT); + } + parameter.setP2pDisable(true); } else { - logger.info("not in debug mode, it will check energy time"); + logger.info("Full node running."); + if (Args.getInstance().isDebug()) { + logger.info("in debug mode, it won't check energy time"); + } else { + logger.info("not in debug mode, it will check energy time"); + } } // init metrics first @@ -55,6 +61,10 @@ public static void main(String[] args) { Application appT = ApplicationFactory.create(context); context.registerShutdownHook(); appT.startup(); + if (parameter.isSolidityNode()) { + SolidityNode node = context.getBean(SolidityNode.class); + node.run(); + } appT.blockUntilShutdown(); } diff --git a/framework/src/main/java/org/tron/program/SolidityNode.java b/framework/src/main/java/org/tron/program/SolidityNode.java index 3367141e2a5..e13f94b8bfe 100644 --- a/framework/src/main/java/org/tron/program/SolidityNode.java +++ b/framework/src/main/java/org/tron/program/SolidityNode.java @@ -1,196 +1,215 @@ -package org.tron.program; - -import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL; - -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicLong; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.support.DefaultListableBeanFactory; -import org.springframework.util.ObjectUtils; -import org.tron.common.application.Application; -import org.tron.common.application.ApplicationFactory; -import org.tron.common.application.TronApplicationContext; -import org.tron.common.client.DatabaseGrpcClient; -import org.tron.common.parameter.CommonParameter; -import org.tron.common.prometheus.Metrics; -import org.tron.core.ChainBaseManager; -import org.tron.core.capsule.BlockCapsule; -import org.tron.core.config.DefaultConfig; -import org.tron.core.db.Manager; -import org.tron.core.exception.TronError; -import org.tron.protos.Protocol.Block; - -@Slf4j(topic = "app") -public class SolidityNode { - - private Manager dbManager; - - private ChainBaseManager chainBaseManager; - - private DatabaseGrpcClient databaseGrpcClient; - - private AtomicLong ID = new AtomicLong(); - - private AtomicLong remoteBlockNum = new AtomicLong(); - - private LinkedBlockingDeque blockQueue = new LinkedBlockingDeque<>(100); - - private int exceptionSleepTime = 1000; - - private volatile boolean flag = true; - - public SolidityNode(Manager dbManager) { - this.dbManager = dbManager; - this.chainBaseManager = dbManager.getChainBaseManager(); - resolveCompatibilityIssueIfUsingFullNodeDatabase(); - ID.set(chainBaseManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum()); - databaseGrpcClient = new DatabaseGrpcClient(CommonParameter.getInstance().getTrustNodeAddr()); - remoteBlockNum.set(getLastSolidityBlockNum()); - } - - /** - * Start the SolidityNode. - */ - public static void start() { - logger.info("Solidity node is running."); - CommonParameter parameter = CommonParameter.getInstance(); - if (ObjectUtils.isEmpty(parameter.getTrustNodeAddr())) { - throw new TronError(new IllegalArgumentException("Trust node is not set."), - TronError.ErrCode.SOLID_NODE_INIT); - } - // init metrics first - Metrics.init(); - - DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory(); - beanFactory.setAllowCircularReferences(false); - TronApplicationContext context = - new TronApplicationContext(beanFactory); - context.register(DefaultConfig.class); - context.refresh(); - Application appT = ApplicationFactory.create(context); - context.registerShutdownHook(); - appT.startup(); - SolidityNode node = new SolidityNode(appT.getDbManager()); - node.run(); - appT.blockUntilShutdown(); - } - - private void run() { - try { - new Thread(this::getBlock).start(); - new Thread(this::processBlock).start(); - logger.info("Success to start solid node, ID: {}, remoteBlockNum: {}.", ID.get(), - remoteBlockNum); - } catch (Exception e) { - logger.error("Failed to start solid node, address: {}.", - CommonParameter.getInstance().getTrustNodeAddr()); - throw new TronError(e, TronError.ErrCode.SOLID_NODE_INIT); - } - } - - private void getBlock() { - long blockNum = ID.incrementAndGet(); - while (flag) { - try { - if (blockNum > remoteBlockNum.get()) { - sleep(BLOCK_PRODUCED_INTERVAL); - remoteBlockNum.set(getLastSolidityBlockNum()); - continue; - } - Block block = getBlockByNum(blockNum); - blockQueue.put(block); - blockNum = ID.incrementAndGet(); - } catch (Exception e) { - logger.error("Failed to get block {}, reason: {}.", blockNum, e.getMessage()); - sleep(exceptionSleepTime); - } - } - } - - private void processBlock() { - while (flag) { - try { - Block block = blockQueue.take(); - loopProcessBlock(block); - } catch (Exception e) { - logger.error(e.getMessage()); - sleep(exceptionSleepTime); - } - } - } - - private void loopProcessBlock(Block block) { - while (flag) { - long blockNum = block.getBlockHeader().getRawData().getNumber(); - try { - dbManager.pushVerifiedBlock(new BlockCapsule(block)); - chainBaseManager.getDynamicPropertiesStore().saveLatestSolidifiedBlockNum(blockNum); - logger - .info("Success to process block: {}, blockQueueSize: {}.", blockNum, blockQueue.size()); - return; - } catch (Exception e) { - logger.error("Failed to process block {}.", new BlockCapsule(block), e); - sleep(exceptionSleepTime); - block = getBlockByNum(blockNum); - } - } - } - - private Block getBlockByNum(long blockNum) { - while (true) { - try { - long time = System.currentTimeMillis(); - Block block = databaseGrpcClient.getBlock(blockNum); - long num = block.getBlockHeader().getRawData().getNumber(); - if (num == blockNum) { - logger.info("Success to get block: {}, cost: {}ms.", - blockNum, System.currentTimeMillis() - time); - return block; - } else { - logger.warn("Get block id not the same , {}, {}.", num, blockNum); - sleep(exceptionSleepTime); - } - } catch (Exception e) { - logger.error("Failed to get block: {}, reason: {}.", blockNum, e.getMessage()); - sleep(exceptionSleepTime); - } - } - } - - private long getLastSolidityBlockNum() { - while (true) { - try { - long time = System.currentTimeMillis(); - long blockNum = databaseGrpcClient.getDynamicProperties().getLastSolidityBlockNum(); - logger.info("Get last remote solid blockNum: {}, remoteBlockNum: {}, cost: {}.", - blockNum, remoteBlockNum, System.currentTimeMillis() - time); - return blockNum; - } catch (Exception e) { - logger.error("Failed to get last solid blockNum: {}, reason: {}.", remoteBlockNum.get(), - e.getMessage()); - sleep(exceptionSleepTime); - } - } - } - - public void sleep(long time) { - try { - Thread.sleep(time); - } catch (Exception e1) { - logger.error(e1.getMessage()); - } - } - - private void resolveCompatibilityIssueIfUsingFullNodeDatabase() { - long lastSolidityBlockNum = - chainBaseManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum(); - long headBlockNum = chainBaseManager.getHeadBlockNum(); - logger.info("headBlockNum:{}, solidityBlockNum:{}, diff:{}", - headBlockNum, lastSolidityBlockNum, headBlockNum - lastSolidityBlockNum); - if (lastSolidityBlockNum < headBlockNum) { - logger.info("use fullNode database, headBlockNum:{}, solidityBlockNum:{}, diff:{}", - headBlockNum, lastSolidityBlockNum, headBlockNum - lastSolidityBlockNum); - chainBaseManager.getDynamicPropertiesStore().saveLatestSolidifiedBlockNum(headBlockNum); - } - } -} \ No newline at end of file +package org.tron.program; + +import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.core.type.AnnotatedTypeMetadata; +import org.springframework.stereotype.Component; +import org.tron.common.client.DatabaseGrpcClient; +import org.tron.common.es.ExecutorServiceManager; +import org.tron.common.parameter.CommonParameter; +import org.tron.core.ChainBaseManager; +import org.tron.core.capsule.BlockCapsule; +import org.tron.core.config.args.Args; +import org.tron.core.exception.TronError; +import org.tron.core.net.TronNetDelegate; +import org.tron.protos.Protocol.Block; + +@Slf4j(topic = "app") +@Conditional(SolidityNode.SolidityCondition.class) +@Component +public class SolidityNode { + + @Autowired + private ChainBaseManager chainBaseManager; + + @Autowired + private TronNetDelegate tronNetDelegate; + + private DatabaseGrpcClient databaseGrpcClient; + + private final AtomicLong ID = new AtomicLong(); + + private final AtomicLong remoteBlockNum = new AtomicLong(); + + private final LinkedBlockingDeque blockQueue = new LinkedBlockingDeque<>(100); + + private final int exceptionSleepTime = 1000; + + private volatile boolean flag = true; + + private final String getBlockName = "get-block"; + private final String processBlockName = "process-block"; + + private ExecutorService getBlockExecutor; + private ExecutorService processBlockExecutor; + + @PostConstruct + private void init() { + resolveCompatibilityIssueIfUsingFullNodeDatabase(); + ID.set(chainBaseManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum()); + getBlockExecutor = ExecutorServiceManager.newSingleThreadExecutor(getBlockName); + processBlockExecutor = ExecutorServiceManager.newSingleThreadExecutor(processBlockName); + } + + @PreDestroy + private void shutdown() { + flag = false; + ExecutorServiceManager.shutdownAndAwaitTermination(getBlockExecutor, getBlockName); + ExecutorServiceManager.shutdownAndAwaitTermination(processBlockExecutor, processBlockName); + if (databaseGrpcClient != null) { + databaseGrpcClient.shutdown(); + } + } + + public void run() { + try { + databaseGrpcClient = new DatabaseGrpcClient(CommonParameter.getInstance().getTrustNodeAddr()); + remoteBlockNum.set(getLastSolidityBlockNum()); + + getBlockExecutor.submit(this::getBlock); + processBlockExecutor.submit(this::processSolidityBlock); + logger.info("Success to start solid node, ID: {}, remoteBlockNum: {}.", ID.get(), + remoteBlockNum); + } catch (Exception e) { + logger.error("Failed to start solid node, address: {}.", + CommonParameter.getInstance().getTrustNodeAddr()); + throw new TronError(e, TronError.ErrCode.SOLID_NODE_INIT); + } + } + + private void getBlock() { + long blockNum = ID.incrementAndGet(); + while (flag) { + try { + if (blockNum > remoteBlockNum.get()) { + sleep(BLOCK_PRODUCED_INTERVAL); + remoteBlockNum.set(getLastSolidityBlockNum()); + continue; + } + Block block = getBlockByNum(blockNum); + blockQueue.put(block); + blockNum = ID.incrementAndGet(); + } catch (Exception e) { + logger.error("Failed to get block {}, reason: {}.", blockNum, e.getMessage()); + sleep(exceptionSleepTime); + } + } + } + + private void processSolidityBlock() { + while (flag) { + try { + Block block = blockQueue.poll(exceptionSleepTime, TimeUnit.MILLISECONDS); + if (block == null) { + continue; + } + loopProcessBlock(block); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.info("processSolidityBlock interrupted."); + return; + } catch (Exception e) { + logger.error(e.getMessage()); + sleep(exceptionSleepTime); + } + } + } + + private void loopProcessBlock(Block block) { + while (flag) { + long blockNum = block.getBlockHeader().getRawData().getNumber(); + try { + tronNetDelegate.pushVerifiedBlock(new BlockCapsule(block)); + chainBaseManager.getDynamicPropertiesStore().saveLatestSolidifiedBlockNum(blockNum); + logger + .info("Success to process block: {}, blockQueueSize: {}.", blockNum, blockQueue.size()); + return; + } catch (Exception e) { + logger.error("Failed to process block {}.", new BlockCapsule(block), e); + sleep(exceptionSleepTime); + block = getBlockByNum(blockNum); + } + } + } + + private Block getBlockByNum(long blockNum) { + while (flag) { + try { + long time = System.currentTimeMillis(); + Block block = databaseGrpcClient.getBlock(blockNum); + long num = block.getBlockHeader().getRawData().getNumber(); + if (num == blockNum) { + logger.info("Success to get block: {}, cost: {}ms.", + blockNum, System.currentTimeMillis() - time); + return block; + } else { + logger.warn("Get block id not the same , {}, {}.", num, blockNum); + sleep(exceptionSleepTime); + } + } catch (Exception e) { + logger.error("Failed to get block: {}, reason: {}.", blockNum, e.getMessage()); + sleep(exceptionSleepTime); + } + } + throw new RuntimeException("SolidityNode is closing."); + } + + private long getLastSolidityBlockNum() { + while (flag) { + try { + long time = System.currentTimeMillis(); + long blockNum = databaseGrpcClient.getDynamicProperties().getLastSolidityBlockNum(); + logger.info("Get last remote solid blockNum: {}, remoteBlockNum: {}, cost: {}.", + blockNum, remoteBlockNum, System.currentTimeMillis() - time); + return blockNum; + } catch (Exception e) { + logger.error("Failed to get last solid blockNum: {}, reason: {}.", remoteBlockNum.get(), + e.getMessage()); + sleep(exceptionSleepTime); + } + } + throw new RuntimeException("SolidityNode is closing."); + } + + public void sleep(long time) { + try { + Thread.sleep(time); + } catch (Exception e1) { + logger.error(e1.getMessage()); + } + } + + private void resolveCompatibilityIssueIfUsingFullNodeDatabase() { + long lastSolidityBlockNum = + chainBaseManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum(); + long headBlockNum = chainBaseManager.getHeadBlockNum(); + logger.info("headBlockNum:{}, solidityBlockNum:{}, diff:{}", + headBlockNum, lastSolidityBlockNum, headBlockNum - lastSolidityBlockNum); + if (lastSolidityBlockNum < headBlockNum) { + logger.info("use fullNode database, headBlockNum:{}, solidityBlockNum:{}, diff:{}", + headBlockNum, lastSolidityBlockNum, headBlockNum - lastSolidityBlockNum); + chainBaseManager.getDynamicPropertiesStore().saveLatestSolidifiedBlockNum(headBlockNum); + } + } + + static class SolidityCondition implements Condition { + + @Override + public boolean matches(@NonNull ConditionContext context, + @NonNull AnnotatedTypeMetadata metadata) { + return Args.getInstance().isSolidityNode(); + } + } +} diff --git a/framework/src/test/java/org/tron/core/db/ManagerTest.java b/framework/src/test/java/org/tron/core/db/ManagerTest.java index b9808b89193..e7c337723a5 100755 --- a/framework/src/test/java/org/tron/core/db/ManagerTest.java +++ b/framework/src/test/java/org/tron/core/db/ManagerTest.java @@ -314,12 +314,6 @@ public void transactionTest() { } catch (Exception e) { Assert.assertTrue(e instanceof TaposException); } - try { - dbManager.pushVerifiedBlock(chainManager.getHead()); - dbManager.getBlockChainHashesOnFork(chainManager.getHeadBlockId()); - } catch (Exception e) { - Assert.assertTrue(e instanceof TaposException); - } } @Test diff --git a/framework/src/test/java/org/tron/program/SolidityNodeTest.java b/framework/src/test/java/org/tron/program/SolidityNodeTest.java index 7d94f813b80..5cfdec41ebf 100755 --- a/framework/src/test/java/org/tron/program/SolidityNodeTest.java +++ b/framework/src/test/java/org/tron/program/SolidityNodeTest.java @@ -40,18 +40,6 @@ public class SolidityNodeTest extends BaseTest { Args.getInstance().setSolidityHttpPort(solidityHttpPort); } - @Test - public void testSolidityArgs() { - Assert.assertNotNull(Args.getInstance().getTrustNodeAddr()); - Assert.assertTrue(Args.getInstance().isSolidityNode()); - String trustNodeAddr = Args.getInstance().getTrustNodeAddr(); - Args.getInstance().setTrustNodeAddr(null); - TronError thrown = assertThrows(TronError.class, - SolidityNode::start); - assertEquals(TronError.ErrCode.SOLID_NODE_INIT, thrown.getErrCode()); - Args.getInstance().setTrustNodeAddr(trustNodeAddr); - } - @Test public void testSolidityGrpcCall() { rpcApiService.start();