From ba1dfda2c4d6e8b1668f2f30f66132125225cd4a Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Tue, 9 Apr 2024 12:56:54 +0530 Subject: [PATCH] HBASE-28468: Integration of time-based priority caching in eviction logic. The time-based priority caching relies on the presence of file paths in the block-cache key. However, in case of the persitent cache, the file paths are not persisted in the files. Hence, when the region server is restarted, the block cache keys need to be repopulated with the file paths. This change addresses the following: 1. Always populate the block-cache key with path during its creation. 2. Fetch the file paths corresponding to the file names of the block-cache key during restarts. 3. Use the Data-Tiering-Manager APIs during cache-full scenario to evict the cold file blocks. Change-Id: Ice19bd41064c73538ee3d3755057813a531b9171 --- .../hbase/io/hfile/HFileBlockIndex.java | 13 ++-- .../hbase/io/hfile/HFilePreadReader.java | 2 +- .../hbase/io/hfile/HFileReaderImpl.java | 2 +- .../hbase/io/hfile/HFileWriterImpl.java | 4 +- .../hbase/io/hfile/bucket/BucketCache.java | 15 ++++- .../io/hfile/bucket/BucketProtoUtils.java | 30 ++++++++- .../regionserver/DataTieringManager.java | 21 +++++++ .../hbase/regionserver/HRegionServer.java | 2 +- .../io/hfile/TestCombinedBlockCache.java | 3 +- .../regionserver/TestDataTieringManager.java | 61 ++++++++++++++++++- 10 files changed, 135 insertions(+), 18 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 592c19c866cf..d6ff22e17a7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -978,7 +979,7 @@ public static class BlockIndexWriter implements InlineBlockWriter { private CacheConfig cacheConf; /** Name to use for computing cache keys */ - private String nameForCaching; + private Path pathForCaching; /** Type of encoding used for index blocks in HFile */ private HFileIndexBlockEncoder indexBlockEncoder; @@ -995,15 +996,15 @@ public BlockIndexWriter() { * @param cacheConf used to determine when and how a block should be cached-on-write. */ public BlockIndexWriter(HFileBlock.Writer blockWriter, CacheConfig cacheConf, - String nameForCaching, HFileIndexBlockEncoder indexBlockEncoder) { - if ((cacheConf == null) != (nameForCaching == null)) { + Path pathForCaching, HFileIndexBlockEncoder indexBlockEncoder) { + if ((cacheConf == null) != (pathForCaching == null)) { throw new IllegalArgumentException( "Block cache and file name for " + "caching must be both specified or both null"); } this.blockWriter = blockWriter; this.cacheConf = cacheConf; - this.nameForCaching = nameForCaching; + this.pathForCaching = pathForCaching; this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE; this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES; this.indexBlockEncoder = @@ -1070,7 +1071,7 @@ public long writeIndexBlocks(FSDataOutputStream out) throws IOException { if (cacheConf != null) { cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf); - cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true, + cache.cacheBlock(new BlockCacheKey(pathForCaching, rootLevelIndexPos, true, blockForCaching.getBlockType()), blockForCaching); }); } @@ -1162,7 +1163,7 @@ private void writeIntermediateBlock(FSDataOutputStream out, BlockIndexChunk pare cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf); cache.cacheBlock( - new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()), + new BlockCacheKey(pathForCaching, beginOffset, true, blockForCaching.getBlockType()), blockForCaching); }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 6063ffe68891..6d491a3aa4bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -79,7 +79,7 @@ public void run() { // so we check first if the block exists on its in-memory index, if so, we just // update the offset and move on to the next block without actually going read all // the way to the cache. - BlockCacheKey cacheKey = new BlockCacheKey(name, offset); + BlockCacheKey cacheKey = new BlockCacheKey(path, offset, true, BlockType.DATA); if (cache.isAlreadyCached(cacheKey).orElse(false)) { // Right now, isAlreadyCached is only supported by BucketCache, which should // always cache data blocks. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index e0f27af71458..e2eeb7a281ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1201,7 +1201,7 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws // Check cache for block. If found return. long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); BlockCacheKey cacheKey = - new BlockCacheKey(name, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META); + new BlockCacheKey(path, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META); cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory()); HFileBlock cachedBlock = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index d2dfaf62106a..786797c43908 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -315,7 +315,7 @@ protected void finishInit(final Configuration conf) { // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, - cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder); + cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? path : null, indexBlockEncoder); dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf)); dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf)); inlineBlockWriters.add(dataBlockIndexWriter); @@ -556,7 +556,7 @@ private void doCacheOnWrite(long offset) { cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); try { - cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), + cache.cacheBlock(new BlockCacheKey(path, offset, true, cacheFormatBlock.getBlockType()), cacheFormatBlock, cacheConf.isInMemory(), true); } finally { // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 71bfc757e51e..cc58d17c3b21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.regionserver.DataTieringManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -141,8 +142,8 @@ public class BucketCache implements BlockCache, HeapSize { /** Statistics thread */ private static final int statThreadPeriod = 5 * 60; - final static int DEFAULT_WRITER_THREADS = 3; - final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; + public final static int DEFAULT_WRITER_THREADS = 3; + public final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; // Store/read block data transient final IOEngine ioEngine; @@ -935,6 +936,16 @@ void freeSpace(final String why) { } try { freeInProgress = true; + + // Check the list of files to determine the cold files which can be readily evicted. + Set coldFiles = + DataTieringManager.getInstance().getColdDataFiles(backingMap.keySet()); + if (coldFiles != null) { + for(String fileName : coldFiles) { + evictBlocksByHfileName(fileName); + } + } + long bytesToFreeWithoutExtra = 0; // Calculate free byte for each bucketSizeinfo StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 4b42414fb9c5..44dc74929547 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.function.Function; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; @@ -32,15 +33,20 @@ import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.regionserver.DataTieringManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @InterfaceAudience.Private final class BucketProtoUtils { + + private static final Logger LOG = LoggerFactory.getLogger(BucketProtoUtils.class); private BucketProtoUtils() { } @@ -130,10 +136,30 @@ static Pair, NavigableSet result = new ConcurrentHashMap<>(); NavigableSet resultSet = new ConcurrentSkipListSet<>(Comparator .comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); + + Map allFilePaths = null; + DataTieringManager dataTieringManager; + try { + dataTieringManager = DataTieringManager.getInstance(); + allFilePaths = dataTieringManager.getAllFilesList(); + } catch (IllegalStateException e) { + // Data-Tiering manager has not been set up. + // Ignore the error and proceed with the normal flow. + LOG.error("Error while getting DataTieringManager instance: {}", e.getMessage()); + } + for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) { BucketCacheProtos.BlockCacheKey protoKey = entry.getKey(); - BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), - protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); + + BlockCacheKey key; + + if(allFilePaths != null && allFilePaths.containsKey(protoKey.getHfilename())) { + key = new BlockCacheKey(allFilePaths.get(protoKey.getHfilename()), protoKey.getOffset(), + protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); + } else { + key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), + protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); + } BucketCacheProtos.BucketEntry protoValue = entry.getValue(); // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator // which created by RpcServer elegantly. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java index 0bc04ddc428b..28f0ddeebca2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.OptionalLong; @@ -219,4 +220,24 @@ private long getDataTieringHotDataAge(Configuration conf) { return Long.parseLong( conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE))); } + + /* + * This API browses through all the regions and returns a map of all file names + * pointing to their paths. + * @return Map with entries containing a mapping from filename to filepath + */ + public Map getAllFilesList() { + Map allFileList = new HashMap<>(); + for (HRegion r : this.onlineRegions.values()) { + for (HStore hStore : r.getStores()) { + Configuration conf = hStore.getReadOnlyConfiguration(); + for (HStoreFile hStoreFile : hStore.getStorefiles()) { + String hFileName = + hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName(); + allFileList.put(hFileName, hStoreFile.getPath()); + } + } + } + return allFileList; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e4da74f78cf2..f55d35e69cac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -530,8 +530,8 @@ public HRegionServer(final Configuration conf) throws IOException { regionServerAccounting = new RegionServerAccounting(conf); - blockCache = BlockCacheFactory.createBlockCache(conf); DataTieringManager.instantiate(onlineRegions); + blockCache = BlockCacheFactory.createBlockCache(conf); mobFileCache = new MobFileCache(conf); rsSnapshotVerifier = new RSSnapshotVerifier(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java index b9bca1ba6b4e..413e3607345c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; @@ -145,7 +146,7 @@ private CombinedBlockCache createCombinedBlockCache() { public void testCombinedBlockCacheStats(BlockType type, int expectedL1Miss, int expectedL2Miss) throws Exception { CombinedBlockCache blockCache = createCombinedBlockCache(); - BlockCacheKey key = new BlockCacheKey("key1", 0, false, type); + BlockCacheKey key = new BlockCacheKey(new Path("key1"), 0, false, type); int size = 100; int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; byte[] byteArr = new byte[length]; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java index afb5862a8a46..91497bbd3675 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_WRITER_THREADS; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import java.io.IOException; @@ -42,11 +46,12 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -92,13 +97,22 @@ public class TestDataTieringManager { private static DataTieringManager dataTieringManager; private static List hStoreFiles; + final static long capacitySize = 32 * 1024 * 1024; + final static int writeThreads = DEFAULT_WRITER_THREADS; + final static int writerQLen = DEFAULT_WRITER_QUEUE_ITEMS; + final static int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; @BeforeClass public static void setupBeforeClass() throws Exception { testDir = TEST_UTIL.getDataTestDir(TestDataTieringManager.class.getSimpleName()); defaultConf = TEST_UTIL.getConfiguration(); fs = HFileSystem.get(defaultConf); - BlockCache blockCache = BlockCacheFactory.createBlockCache(defaultConf); + fs.mkdirs(testDir); + + BlockCache blockCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); cacheConf = new CacheConfig(defaultConf, blockCache); + setupOnlineRegions(); DataTieringManager.instantiate(testOnlineRegions); dataTieringManager = DataTieringManager.getInstance(); @@ -218,6 +232,49 @@ public void testColdDataFiles() { } } + @Test + public void testAllDataFiles() { + Set allCachedBlocks = new HashSet<>(); + for (HStoreFile file : hStoreFiles) { + allCachedBlocks.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA)); + } + Map allFilePaths = dataTieringManager.getAllFilesList(); + assertEquals(hStoreFiles.size(), allFilePaths.size()); + } + + public void testAllDataFilesAfterRestart() throws Exception { + Set cacheKeys = new HashSet<>(); + // Create Cache keys + for (HStoreFile file : hStoreFiles) { + cacheKeys.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA)); + } + // Create dummy data to be cached. + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); + BucketCache cache = (BucketCache) cacheConf.getBlockCache().get(); + int blocksIter = 0; + for(BlockCacheKey key: cacheKeys) { + cache.cacheBlock(key, blocks[blocksIter++].getBlock()); + // Ensure that the block is persisted to the file. + while (!cache.getBackingMap().containsKey(key)) { + Thread.sleep(100); + } + } + + // shutting down the cache persists the backmap to disk. + cache.shutdown(); + + // create a new cache which is populated from the disk which simulates a server restart. + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); + + Set keySet = newBucketCache.getBackingMap().keySet(); + assertEquals(hStoreFiles.size(), keySet.size()); + for(BlockCacheKey key: keySet) { + assertNotNull(key.getFilePath()); + } + } + private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path, boolean expectedResult, DataTieringException exception) { try {