From eab8d2cc37105172e214f694bc059d2602552dff Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Thu, 8 Jun 2023 14:00:54 +0800 Subject: [PATCH 1/7] feat(db): Optimize for bloomFilter initialization --- .../org/tron/core/db2/common/TxCacheDB.java | 168 +++++++++++++++++- 1 file changed, 166 insertions(+), 2 deletions(-) diff --git a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java index 71cf361b06a..ce98c3db5ec 100644 --- a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java +++ b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java @@ -3,10 +3,24 @@ import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; import com.google.common.primitives.Longs; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; import org.bouncycastle.util.encoders.Hex; @@ -42,6 +56,7 @@ public class TxCacheDB implements DB, Flusher { private BloomFilter[] bloomFilters = new BloomFilter[2]; // filterStartBlock record the start block of the active filter private volatile long filterStartBlock = INVALID_BLOCK; + private volatile long currentBlockNum = INVALID_BLOCK; // currentFilterIndex records the index of the active filter private volatile int currentFilterIndex = 0; @@ -110,6 +125,9 @@ private void initCache() { } public void init() { + if (recovery()) { + return; + } long size = recentTransactionStore.size(); if (size != MAX_BLOCK_SIZE) { // 0. load from persistentStore @@ -172,7 +190,7 @@ public void put(byte[] key, byte[] value) { MAX_BLOCK_SIZE * TRANSACTION_COUNT); } bloomFilters[currentFilterIndex].put(key); - + currentBlockNum = blockNum; if (lastMetricBlock != blockNum) { lastMetricBlock = blockNum; Metrics.gaugeSet(MetricKeys.Gauge.TX_CACHE, @@ -214,7 +232,7 @@ public void flush(Map batch) { @Override public void close() { - reset(); + dump(); bloomFilters[0] = null; bloomFilters[1] = null; persistentStore.close(); @@ -224,6 +242,152 @@ public void close() { public void reset() { } + private boolean recovery() { + logger.info("recovery bloomFilters start."); + final Path file0 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "bloomFilters_0"); + final Path file1 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "bloomFilters_1"); + Path txCacheProperties = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "txCache.properties"); + Map properties = readProperties(txCacheProperties); + + if (properties == null || properties.size() != 3) { + logger.info("properties is corrupted."); + try { + Files.deleteIfExists(file0); + Files.deleteIfExists(file1); + } catch (IOException e) { + logger.warn("recovery bloomFilters failed. {}", e.getMessage()); + } + logger.info("rollback to previous mode."); + return false; + } + + filterStartBlock = Long.parseLong(properties.get("filterStartBlock")); + currentBlockNum = Long.parseLong(properties.get("currentBlockNum")); + currentFilterIndex = Integer.parseInt(properties.get("currentFilterIndex")); + CompletableFuture tk0 = CompletableFuture.supplyAsync(() -> recovery(0, file0)); + CompletableFuture tk1 = CompletableFuture.supplyAsync(() -> recovery(1, file1)); + + return CompletableFuture.allOf(tk0, tk1).thenApply(v -> { + logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}", + filterStartBlock, currentBlockNum, currentFilterIndex); + logger.info("recovery bloomFilters success."); + return true; + }).exceptionally(e -> { + bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(), + MAX_BLOCK_SIZE * TRANSACTION_COUNT); + bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(), + MAX_BLOCK_SIZE * TRANSACTION_COUNT); + logger.warn("recovery bloomFilters failed. {}", e.getMessage()); + logger.info("rollback to previous mode."); + return false; + }).join(); + } + + private boolean recovery(int index, Path file) { + try (InputStream in = new BufferedInputStream(Files.newInputStream(file, + StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) { + logger.info("recovery bloomFilter[{}] from file.", index); + long start = System.currentTimeMillis(); + bloomFilters[index] = BloomFilter.readFrom(in, Funnels.byteArrayFunnel()); + logger.info("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms.", + index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), + System.currentTimeMillis() - start); + return true; + } catch (IOException e) { + logger.warn("recovery bloomFilter[{}] failed.", index, e); + throw new RuntimeException(e); + } + } + + private void dump() { + logger.info("dump bloomFilters start."); + Path file0 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "bloomFilters_0"); + Path file1 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "bloomFilters_1"); + Path txCacheProperties = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "txCache.properties"); + CompletableFuture task0 = CompletableFuture.runAsync(() -> dump(0, file0)); + CompletableFuture task1 = CompletableFuture.runAsync(() -> dump(1, file1)); + CompletableFuture.allOf(task0, task1).thenRun(() -> { + logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}", + filterStartBlock, currentBlockNum, currentFilterIndex); + Map properties = new HashMap<>(); + properties.put("filterStartBlock", String.valueOf(filterStartBlock)); + properties.put("currentBlockNum", String.valueOf(currentBlockNum)); + properties.put("currentFilterIndex", String.valueOf(currentFilterIndex)); + writeProperties(txCacheProperties, properties); + logger.info("dump bloomFilters done."); + + }).exceptionally(e -> { + logger.warn("dump bloomFilters to file failed. {}", e.getMessage()); + return null; + }).join(); + } + + private void dump(int index, Path file) { + try { + Files.deleteIfExists(file); + } catch (IOException e) { + logger.warn("dump bloomFilters[{}] failed.", index, e); + throw new RuntimeException(e); + } + try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file, + StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))) { + logger.info("dump bloomFilters[{}] to file.", index); + long start = System.currentTimeMillis(); + bloomFilters[index].writeTo(out); + logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.", + index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), + System.currentTimeMillis() - start); + } catch (IOException e) { + logger.warn("dump bloomFilters[{}] failed. {}", index, e.getMessage()); + throw new RuntimeException(e); + } + } + + private Map readProperties(Path file) { + try (BufferedReader r = Files.newBufferedReader(file, StandardCharsets.UTF_8)) { + Properties prop = new Properties(); + prop.load(r); + HashMap map = new HashMap<>(); + prop.forEach((k, v) -> { + String key = new String(k.toString().getBytes(StandardCharsets.ISO_8859_1), + StandardCharsets.UTF_8); + String value = new String(v.toString().getBytes(StandardCharsets.ISO_8859_1), + StandardCharsets.UTF_8); + map.put(key, value); + }); + map.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty()); + return map; + } catch (IOException e) { + logger.warn("readProperties. {}", e.getMessage()); + return null; + } finally { + try { + Files.deleteIfExists(file); + } catch (IOException e) { + logger.warn("readProperties. {}", e.getMessage()); + } + } + } + + private boolean writeProperties(Path file, Map kvMap) { + try (BufferedWriter w = Files.newBufferedWriter(file); + BufferedReader r = Files.newBufferedReader(file)) { + Properties properties = new Properties(); + properties.load(r); + kvMap.forEach(properties::setProperty); + properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! "); + } catch (IOException e) { + throw new RuntimeException(e); + } + return true; + } + @Override public TxCacheDB newInstance() { return new TxCacheDB(name, recentTransactionStore); From 080e76c80283020e7aff5c5fec363d600efb86b8 Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Tue, 8 Aug 2023 09:49:26 +0800 Subject: [PATCH 2/7] fix bloomFilter persistence --- chainbase/src/main/java/org/tron/core/ChainBaseManager.java | 5 +++++ .../src/main/java/org/tron/core/db/TransactionCache.java | 0 2 files changed, 5 insertions(+) rename {framework => chainbase}/src/main/java/org/tron/core/db/TransactionCache.java (100%) diff --git a/chainbase/src/main/java/org/tron/core/ChainBaseManager.java b/chainbase/src/main/java/org/tron/core/ChainBaseManager.java index adf66527499..d94d52f168b 100644 --- a/chainbase/src/main/java/org/tron/core/ChainBaseManager.java +++ b/chainbase/src/main/java/org/tron/core/ChainBaseManager.java @@ -28,6 +28,7 @@ import org.tron.core.db.PbftSignDataStore; import org.tron.core.db.RecentBlockStore; import org.tron.core.db.RecentTransactionStore; +import org.tron.core.db.TransactionCache; import org.tron.core.db.TransactionStore; import org.tron.core.db2.core.ITronChainBase; import org.tron.core.exception.BadItemException; @@ -237,6 +238,9 @@ public class ChainBaseManager { @Autowired private DbStatService dbStatService; + @Autowired + private TransactionCache transactionCache; + @Getter @Setter private NodeType nodeType; @@ -291,6 +295,7 @@ public void closeAllStore() { closeOneStore(pbftSignDataStore); closeOneStore(sectionBloomStore); closeOneStore(accountAssetStore); + closeOneStore(transactionCache); } // for test only diff --git a/framework/src/main/java/org/tron/core/db/TransactionCache.java b/chainbase/src/main/java/org/tron/core/db/TransactionCache.java similarity index 100% rename from framework/src/main/java/org/tron/core/db/TransactionCache.java rename to chainbase/src/main/java/org/tron/core/db/TransactionCache.java From 11c26baa7c91b2b121f7f5e4adc069019ca39bfc Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Tue, 8 Aug 2023 16:43:36 +0800 Subject: [PATCH 3/7] add test --- .../org/tron/core/db/TxCacheDBInitTest.java | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java diff --git a/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java b/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java new file mode 100644 index 00000000000..3467199817f --- /dev/null +++ b/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java @@ -0,0 +1,90 @@ +package org.tron.core.db; + +import lombok.extern.slf4j.Slf4j; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.tron.common.application.TronApplicationContext; +import org.tron.common.utils.ByteArray; +import org.tron.core.Constant; +import org.tron.core.capsule.BytesCapsule; +import org.tron.core.config.DefaultConfig; +import org.tron.core.config.args.Args; +import org.tron.keystore.Wallet; + +import java.io.IOException; + +@Slf4j +public class TxCacheDBInitTest { + + private static TronApplicationContext context; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private final byte[][] hash = new byte[140000][64]; + + @AfterClass + public static void destroy() { + context.destroy(); + Args.clearParam(); + } + + /** + * Init data. + */ + @BeforeClass + public static void init() throws IOException { + Args.setParam(new String[]{"--output-directory", temporaryFolder.newFolder().toString(), + "--p2p-disable", "true"}, + Constant.TEST_CONF); + context = new TronApplicationContext(DefaultConfig.class); + } + + @Test + public void reload(){ + putTransaction(); + DefaultListableBeanFactory defaultListableBeanFactory = + (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory(); + queryTransaction(); + defaultListableBeanFactory.destroySingleton("transactionCache"); + TransactionCache transactionCache = new TransactionCache("transactionCache", + context.getBean(RecentTransactionStore.class)); + transactionCache.initCache(); + defaultListableBeanFactory.registerSingleton("transactionCache",transactionCache); + queryTransaction(); + } + + private void putTransaction() { + TransactionCache db = context.getBean(TransactionCache.class); + for (int i = 1; i < 140000; i++) { + hash[i] = Wallet.generateRandomBytes(64); + db.put(hash[i], new BytesCapsule(ByteArray.fromLong(i))); + } + } + + private void queryTransaction() { + TransactionCache db = context.getBean(TransactionCache.class); + // [1,65537] are expired + for (int i = 1; i < 65538; i++) { + try { + Assert.assertFalse("index = " + i, db.has(hash[i])); + } catch (Exception e) { + Assert.fail("transaction should be expired index = " + i); + } + } + // [65538,140000] are in cache + for (int i = 65538; i < 140000; i++) { + try { + Assert.assertTrue("index = " + i, db.has(hash[i])); + } catch (Exception e) { + Assert.fail("transaction should not be expired index = " + i); + } + } + } + +} \ No newline at end of file From 00fac097d00bc0c60f5e47be32aba0a629c906c8 Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Tue, 8 Aug 2023 16:57:40 +0800 Subject: [PATCH 4/7] fix checkStyle --- .../src/test/java/org/tron/core/db/TxCacheDBInitTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java b/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java index 3467199817f..3da8337ee71 100644 --- a/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java +++ b/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java @@ -1,5 +1,6 @@ package org.tron.core.db; +import java.io.IOException; import lombok.extern.slf4j.Slf4j; import org.junit.AfterClass; import org.junit.Assert; @@ -16,8 +17,6 @@ import org.tron.core.config.args.Args; import org.tron.keystore.Wallet; -import java.io.IOException; - @Slf4j public class TxCacheDBInitTest { @@ -40,13 +39,12 @@ public static void destroy() { @BeforeClass public static void init() throws IOException { Args.setParam(new String[]{"--output-directory", temporaryFolder.newFolder().toString(), - "--p2p-disable", "true"}, - Constant.TEST_CONF); + "--p2p-disable", "true"}, Constant.TEST_CONF); context = new TronApplicationContext(DefaultConfig.class); } @Test - public void reload(){ + public void reload() { putTransaction(); DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory(); From 2c6fdac162312b6b0e7befcb4154807733755f33 Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Mon, 14 Aug 2023 21:11:30 +0800 Subject: [PATCH 5/7] hide cache-related files --- .../org/tron/core/db2/common/TxCacheDB.java | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java index ce98c3db5ec..381d337a386 100644 --- a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java +++ b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java @@ -31,6 +31,7 @@ import org.tron.common.storage.leveldb.LevelDbDataSourceImpl; import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl; import org.tron.common.utils.ByteArray; +import org.tron.common.utils.FileUtil; import org.tron.common.utils.JsonUtil; import org.tron.common.utils.StorageUtils; import org.tron.core.capsule.BytesCapsule; @@ -72,6 +73,11 @@ public class TxCacheDB implements DB, Flusher { // replace persistentStore and optimizes startup performance private RecentTransactionStore recentTransactionStore; + private final Path cacheFile0; + private final Path cacheFile1; + private final Path cacheProperties; + private final Path cacheDir; + public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) { this.name = name; this.TRANSACTION_COUNT = @@ -100,6 +106,10 @@ public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) { MAX_BLOCK_SIZE * TRANSACTION_COUNT); this.bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(), MAX_BLOCK_SIZE * TRANSACTION_COUNT); + cacheDir = Paths.get(CommonParameter.getInstance().getOutputDirectory(), ".cache"); + this.cacheFile0 = Paths.get(cacheDir.toString(), "bloomFilters_0"); + this.cacheFile1 = Paths.get(cacheDir.toString(), "bloomFilters_1"); + this.cacheProperties = Paths.get(cacheDir.toString(), "txCache.properties"); } @@ -243,20 +253,15 @@ public void reset() { } private boolean recovery() { + FileUtil.createDirIfNotExists(this.cacheDir.toString()); logger.info("recovery bloomFilters start."); - final Path file0 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), - "bloomFilters_0"); - final Path file1 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), - "bloomFilters_1"); - Path txCacheProperties = Paths.get(CommonParameter.getInstance().getOutputDirectory(), - "txCache.properties"); - Map properties = readProperties(txCacheProperties); + Map properties = readProperties(this.cacheProperties); if (properties == null || properties.size() != 3) { logger.info("properties is corrupted."); try { - Files.deleteIfExists(file0); - Files.deleteIfExists(file1); + Files.deleteIfExists(this.cacheFile0); + Files.deleteIfExists(this.cacheFile1); } catch (IOException e) { logger.warn("recovery bloomFilters failed. {}", e.getMessage()); } @@ -267,8 +272,10 @@ private boolean recovery() { filterStartBlock = Long.parseLong(properties.get("filterStartBlock")); currentBlockNum = Long.parseLong(properties.get("currentBlockNum")); currentFilterIndex = Integer.parseInt(properties.get("currentFilterIndex")); - CompletableFuture tk0 = CompletableFuture.supplyAsync(() -> recovery(0, file0)); - CompletableFuture tk1 = CompletableFuture.supplyAsync(() -> recovery(1, file1)); + CompletableFuture tk0 = CompletableFuture.supplyAsync( + () -> recovery(0, this.cacheFile0)); + CompletableFuture tk1 = CompletableFuture.supplyAsync( + () -> recovery(1, this.cacheFile1)); return CompletableFuture.allOf(tk0, tk1).thenApply(v -> { logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}", @@ -303,15 +310,12 @@ private boolean recovery(int index, Path file) { } private void dump() { + FileUtil.createDirIfNotExists(this.cacheDir.toString()); logger.info("dump bloomFilters start."); - Path file0 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), - "bloomFilters_0"); - Path file1 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), - "bloomFilters_1"); - Path txCacheProperties = Paths.get(CommonParameter.getInstance().getOutputDirectory(), - "txCache.properties"); - CompletableFuture task0 = CompletableFuture.runAsync(() -> dump(0, file0)); - CompletableFuture task1 = CompletableFuture.runAsync(() -> dump(1, file1)); + CompletableFuture task0 = CompletableFuture.runAsync( + () -> dump(0, this.cacheFile0)); + CompletableFuture task1 = CompletableFuture.runAsync( + () -> dump(1, this.cacheFile1)); CompletableFuture.allOf(task0, task1).thenRun(() -> { logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}", filterStartBlock, currentBlockNum, currentFilterIndex); @@ -319,7 +323,7 @@ private void dump() { properties.put("filterStartBlock", String.valueOf(filterStartBlock)); properties.put("currentBlockNum", String.valueOf(currentBlockNum)); properties.put("currentFilterIndex", String.valueOf(currentFilterIndex)); - writeProperties(txCacheProperties, properties); + writeProperties(this.cacheProperties, properties); logger.info("dump bloomFilters done."); }).exceptionally(e -> { From 6cd5190b7549e8677aeaffd328a59aaf46b9935d Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Tue, 15 Aug 2023 00:03:55 +0800 Subject: [PATCH 6/7] redesign exception handling --- .../org/tron/core/db2/common/TxCacheDB.java | 138 +++++++----------- 1 file changed, 52 insertions(+), 86 deletions(-) diff --git a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java index 381d337a386..b681e8a6a26 100644 --- a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java +++ b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java @@ -5,17 +5,16 @@ import com.google.common.primitives.Longs; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.Reader; +import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -255,42 +254,16 @@ public void reset() { private boolean recovery() { FileUtil.createDirIfNotExists(this.cacheDir.toString()); logger.info("recovery bloomFilters start."); - Map properties = readProperties(this.cacheProperties); - - if (properties == null || properties.size() != 3) { - logger.info("properties is corrupted."); - try { - Files.deleteIfExists(this.cacheFile0); - Files.deleteIfExists(this.cacheFile1); - } catch (IOException e) { - logger.warn("recovery bloomFilters failed. {}", e.getMessage()); - } - logger.info("rollback to previous mode."); - return false; - } - - filterStartBlock = Long.parseLong(properties.get("filterStartBlock")); - currentBlockNum = Long.parseLong(properties.get("currentBlockNum")); - currentFilterIndex = Integer.parseInt(properties.get("currentFilterIndex")); - CompletableFuture tk0 = CompletableFuture.supplyAsync( - () -> recovery(0, this.cacheFile0)); - CompletableFuture tk1 = CompletableFuture.supplyAsync( - () -> recovery(1, this.cacheFile1)); + CompletableFuture loadProperties = CompletableFuture.supplyAsync(this::loadProperties); + CompletableFuture tk0 = loadProperties.thenApplyAsync( + v -> recovery(0, this.cacheFile0)); + CompletableFuture tk1 = loadProperties.thenApplyAsync( + v -> recovery(1, this.cacheFile1)); return CompletableFuture.allOf(tk0, tk1).thenApply(v -> { - logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}", - filterStartBlock, currentBlockNum, currentFilterIndex); logger.info("recovery bloomFilters success."); return true; - }).exceptionally(e -> { - bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(), - MAX_BLOCK_SIZE * TRANSACTION_COUNT); - bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(), - MAX_BLOCK_SIZE * TRANSACTION_COUNT); - logger.warn("recovery bloomFilters failed. {}", e.getMessage()); - logger.info("rollback to previous mode."); - return false; - }).join(); + }).exceptionally(this::handleException).join(); } private boolean recovery(int index, Path file) { @@ -303,12 +276,27 @@ private boolean recovery(int index, Path file) { index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), System.currentTimeMillis() - start); return true; - } catch (IOException e) { - logger.warn("recovery bloomFilter[{}] failed.", index, e); + } catch (Exception e) { throw new RuntimeException(e); } } + private boolean handleException(Throwable e) { + bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(), + MAX_BLOCK_SIZE * TRANSACTION_COUNT); + bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(), + MAX_BLOCK_SIZE * TRANSACTION_COUNT); + try { + Files.deleteIfExists(this.cacheFile0); + Files.deleteIfExists(this.cacheFile1); + } catch (Exception ignored) { + + } + logger.info("recovery bloomFilters failed. {}", e.getMessage()); + logger.info("rollback to previous mode."); + return false; + } + private void dump() { FileUtil.createDirIfNotExists(this.cacheDir.toString()); logger.info("dump bloomFilters start."); @@ -317,79 +305,57 @@ private void dump() { CompletableFuture task1 = CompletableFuture.runAsync( () -> dump(1, this.cacheFile1)); CompletableFuture.allOf(task0, task1).thenRun(() -> { - logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}", - filterStartBlock, currentBlockNum, currentFilterIndex); - Map properties = new HashMap<>(); - properties.put("filterStartBlock", String.valueOf(filterStartBlock)); - properties.put("currentBlockNum", String.valueOf(currentBlockNum)); - properties.put("currentFilterIndex", String.valueOf(currentFilterIndex)); - writeProperties(this.cacheProperties, properties); + writeProperties(); logger.info("dump bloomFilters done."); }).exceptionally(e -> { - logger.warn("dump bloomFilters to file failed. {}", e.getMessage()); + logger.info("dump bloomFilters to file failed. {}", e.getMessage()); return null; }).join(); } private void dump(int index, Path file) { - try { - Files.deleteIfExists(file); - } catch (IOException e) { - logger.warn("dump bloomFilters[{}] failed.", index, e); - throw new RuntimeException(e); - } - try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file, - StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))) { + try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file))) { logger.info("dump bloomFilters[{}] to file.", index); long start = System.currentTimeMillis(); bloomFilters[index].writeTo(out); logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.", index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), System.currentTimeMillis() - start); - } catch (IOException e) { - logger.warn("dump bloomFilters[{}] failed. {}", index, e.getMessage()); + } catch (Exception e) { throw new RuntimeException(e); } } - private Map readProperties(Path file) { - try (BufferedReader r = Files.newBufferedReader(file, StandardCharsets.UTF_8)) { - Properties prop = new Properties(); - prop.load(r); - HashMap map = new HashMap<>(); - prop.forEach((k, v) -> { - String key = new String(k.toString().getBytes(StandardCharsets.ISO_8859_1), - StandardCharsets.UTF_8); - String value = new String(v.toString().getBytes(StandardCharsets.ISO_8859_1), - StandardCharsets.UTF_8); - map.put(key, value); - }); - map.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty()); - return map; - } catch (IOException e) { - logger.warn("readProperties. {}", e.getMessage()); - return null; - } finally { - try { - Files.deleteIfExists(file); - } catch (IOException e) { - logger.warn("readProperties. {}", e.getMessage()); - } + private boolean loadProperties() { + try (Reader r = new InputStreamReader(new BufferedInputStream(Files.newInputStream( + this.cacheProperties, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)), + StandardCharsets.UTF_8)) { + Properties properties = new Properties(); + properties.load(r); + filterStartBlock = Long.parseLong(properties.getProperty("filterStartBlock")); + currentBlockNum = Long.parseLong(properties.getProperty("currentBlockNum")); + currentFilterIndex = Integer.parseInt(properties.getProperty("currentFilterIndex")); + logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done.", + filterStartBlock, currentBlockNum, currentFilterIndex); + return true; + } catch (Exception e) { + throw new RuntimeException(e); } } - private boolean writeProperties(Path file, Map kvMap) { - try (BufferedWriter w = Files.newBufferedWriter(file); - BufferedReader r = Files.newBufferedReader(file)) { + private void writeProperties() { + try (Writer w = Files.newBufferedWriter(this.cacheProperties, StandardCharsets.UTF_8)) { Properties properties = new Properties(); - properties.load(r); - kvMap.forEach(properties::setProperty); + properties.setProperty("filterStartBlock", String.valueOf(filterStartBlock)); + properties.setProperty("currentBlockNum", String.valueOf(currentBlockNum)); + properties.setProperty("currentFilterIndex", String.valueOf(currentFilterIndex)); properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! "); - } catch (IOException e) { + logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done.", + filterStartBlock, currentBlockNum, currentFilterIndex); + } catch (Exception e) { throw new RuntimeException(e); } - return true; } @Override From 7d1eac9cd63f49544f38fec3e83facea9bb4cb07 Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Tue, 15 Aug 2023 01:09:59 +0800 Subject: [PATCH 7/7] avoid dumping a wrong cached data --- .../main/java/org/tron/core/db2/common/TxCacheDB.java | 11 ++++++++++- .../test/java/org/tron/core/db/TxCacheDBInitTest.java | 2 ++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java index b681e8a6a26..9923a876cad 100644 --- a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java +++ b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java @@ -20,6 +20,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; import org.bouncycastle.util.encoders.Hex; @@ -76,6 +77,7 @@ public class TxCacheDB implements DB, Flusher { private final Path cacheFile1; private final Path cacheProperties; private final Path cacheDir; + private AtomicBoolean isValid = new AtomicBoolean(false); public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) { this.name = name; @@ -135,6 +137,7 @@ private void initCache() { public void init() { if (recovery()) { + isValid.set(true); return; } long size = recentTransactionStore.size(); @@ -156,6 +159,7 @@ public void init() { logger.info("Load cache from recentTransactionStore, filter: {}, filter-fpp: {}, cost: {} ms.", bloomFilters[1].approximateElementCount(), bloomFilters[1].expectedFpp(), System.currentTimeMillis() - start); + isValid.set(true); } @Override @@ -235,8 +239,10 @@ public Iterator> iterator() { } @Override - public void flush(Map batch) { + public synchronized void flush(Map batch) { + isValid.set(false); batch.forEach((k, v) -> this.put(k.getBytes(), v.getBytes())); + isValid.set(true); } @Override @@ -298,6 +304,9 @@ private boolean handleException(Throwable e) { } private void dump() { + if (!isValid.get()) { + logger.info("bloomFilters is not valid."); + } FileUtil.createDirIfNotExists(this.cacheDir.toString()); logger.info("dump bloomFilters start."); CompletableFuture task0 = CompletableFuture.runAsync( diff --git a/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java b/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java index 3da8337ee71..c3cb7cb2eb6 100644 --- a/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java +++ b/framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java @@ -45,6 +45,8 @@ public static void init() throws IOException { @Test public void reload() { + TransactionCache db = context.getBean(TransactionCache.class); + db.initCache(); putTransaction(); DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();